complete_agent_flow_rule.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652
  1. """
  2. 完整的智能体工作流 (Complete Agent Flow)
  3. =====================================
  4. 此工作流整合了规划、大纲生成和指标计算四个核心智能体,实现完整的报告生成流程。
  5. 包含的智能体:
  6. 1. PlanningAgent (规划智能体) - 分析状态并做出决策
  7. 2. OutlineAgent (大纲生成智能体) - 生成报告结构和指标需求
  8. 3. MetricCalculationAgent (指标计算智能体) - 执行标准指标计算
  9. 4. RulesEngineMetricCalculationAgent (规则引擎指标计算智能体) - 执行规则引擎指标计算
  10. 工作流程:
  11. 1. 规划节点 → 分析当前状态,决定下一步行动
  12. 2. 大纲生成节点 → 生成报告大纲和指标需求
  13. 3. 指标判断节点 → 根据大纲确定需要计算的指标
  14. 4. 指标计算节点 → 执行具体的指标计算任务
  15. 技术特点:
  16. - 基于LangGraph的状态机工作流
  17. - 支持条件路由和状态管理
  18. - 完善的错误处理机制
  19. - 详细的执行日志记录
  20. 作者: Big Agent Team
  21. 版本: 1.0.0
  22. 创建时间: 2024-12-20
  23. """
  24. import asyncio
  25. from typing import Dict, Any, List
  26. from datetime import datetime
  27. from langgraph.graph import StateGraph, START, END
  28. from langchain_core.messages import HumanMessage
  29. from workflow_state import (
  30. IntegratedWorkflowState,
  31. create_initial_integrated_state,
  32. is_state_ready_for_calculation,
  33. get_calculation_progress,
  34. update_state_with_outline_generation,
  35. update_state_with_planning_decision,
  36. finalize_state_with_report,
  37. convert_numpy_types,
  38. MetricRequirement,
  39. ReportOutline
  40. )
  41. from llmops.agents.outline_agent import OutlineGeneratorAgent, generate_report_outline
  42. from llmops.agents.planning_agent import PlanningAgent, plan_next_action, analyze_current_state
  43. from llmops.agents.metric_calculation_agent import MetricCalculationAgent
  44. from llmops.agents.rules_engine_metric_calculation_agent import RulesEngineMetricCalculationAgent
  45. class CompleteAgentFlow:
  46. """完整的智能体工作流"""
  47. def __init__(self, api_key: str, base_url: str = "https://api.deepseek.com"):
  48. """
  49. 初始化完整的工作流
  50. Args:
  51. api_key: DeepSeek API密钥
  52. base_url: DeepSeek API基础URL
  53. """
  54. self.api_key = api_key
  55. self.base_url = base_url
  56. # 初始化各个智能体
  57. self.planning_agent = PlanningAgent(api_key, base_url)
  58. self.outline_agent = OutlineGeneratorAgent(api_key, base_url)
  59. # self.metric_agent = MetricCalculationAgent(api_key, base_url)
  60. self.rules_engine_agent = RulesEngineMetricCalculationAgent(api_key, base_url)
  61. # 创建工作流图
  62. self.workflow = self._create_workflow()
  63. def _create_workflow(self) -> StateGraph:
  64. """创建LangGraph工作流"""
  65. workflow = StateGraph(IntegratedWorkflowState)
  66. # 添加节点
  67. workflow.add_node("planning_node", self._planning_node)
  68. workflow.add_node("outline_generator", self._outline_generator_node)
  69. workflow.add_node("metric_evaluator", self._metric_evaluator_node)
  70. workflow.add_node("metric_calculator", self._metric_calculator_node)
  71. workflow.add_node("report_finalizer", self._report_finalizer_node)
  72. # 设置入口点
  73. workflow.set_entry_point("planning_node")
  74. # 添加条件边 - 基于规划决策路由
  75. workflow.add_conditional_edges(
  76. "planning_node",
  77. self._route_from_planning,
  78. {
  79. "outline_generator": "outline_generator",
  80. "metric_evaluator": "metric_evaluator",
  81. "metric_calculator": "metric_calculator",
  82. "report_finalizer": "report_finalizer",
  83. END: END
  84. }
  85. )
  86. # 从各个节点返回规划节点重新决策
  87. workflow.add_edge("outline_generator", "planning_node")
  88. workflow.add_edge("metric_evaluator", "planning_node")
  89. workflow.add_edge("metric_calculator", "planning_node")
  90. workflow.add_edge("report_finalizer", END)
  91. return workflow
  92. def _route_from_planning(self, state: IntegratedWorkflowState) -> str:
  93. """
  94. 从规划节点路由到下一个节点
  95. Args:
  96. state: 当前状态
  97. Returns:
  98. 目标节点名称
  99. """
  100. print(f"\n🔍 [路由决策] 步骤={state['planning_step']}, "
  101. f"大纲={state.get('outline_draft') is not None}, "
  102. f"指标需求={len(state.get('metrics_requirements', []))}")
  103. # 防止无限循环
  104. if state['planning_step'] > 30:
  105. print("⚠️ 规划步骤超过30次,强制结束流程")
  106. return END
  107. # 如果大纲为空 → 生成大纲
  108. if not state.get("outline_draft"):
  109. print("→ 路由到 outline_generator(生成大纲)")
  110. return "outline_generator"
  111. # 如果指标需求为空但大纲已生成 → 评估指标需求
  112. if not state.get("metrics_requirements") and state.get("outline_draft"):
  113. print("→ 路由到 metric_evaluator(评估指标需求)")
  114. return "metric_evaluator"
  115. # 计算覆盖率
  116. progress = get_calculation_progress(state)
  117. coverage = progress["coverage_rate"]
  118. print(f" 指标覆盖率 = {coverage:.2%}")
  119. # 如果有待计算指标且覆盖率 < 100% → 计算指标
  120. if state.get("pending_metric_ids") and coverage < 1.0:
  121. print(f"→ 路由到 metric_calculator(计算指标,覆盖率={coverage:.2%})")
  122. return "metric_calculator"
  123. # 如果没有待计算指标或覆盖率 >= 80% → 生成最终报告
  124. if not state.get("pending_metric_ids") or coverage >= 0.8:
  125. print(f"→ 路由到 report_finalizer(生成最终报告,覆盖率={coverage:.2%})")
  126. return "report_finalizer"
  127. # 默认返回规划节点
  128. return "planning_node"
  129. async def _planning_node(self, state: IntegratedWorkflowState) -> IntegratedWorkflowState:
  130. """规划节点:分析状态并做出决策"""
  131. try:
  132. print("🧠 正在执行规划分析...")
  133. # 使用规划智能体做出决策
  134. decision = await plan_next_action(
  135. question=state["question"],
  136. industry=state["industry"],
  137. current_state=state,
  138. api_key=self.api_key
  139. )
  140. # 更新状态
  141. new_state = update_state_with_planning_decision(state, {
  142. "decision": decision.decision,
  143. "next_route": self._decision_to_route(decision.decision),
  144. "metrics_to_compute": decision.metrics_to_compute
  145. })
  146. # 添加决策消息
  147. decision_msg = self._format_decision_message(decision)
  148. new_state["messages"].append({
  149. "role": "assistant",
  150. "content": decision_msg,
  151. "timestamp": datetime.now().isoformat()
  152. })
  153. print(f"✅ 规划决策完成:{decision.decision}")
  154. return convert_numpy_types(new_state)
  155. except Exception as e:
  156. print(f"❌ 规划节点执行失败: {e}")
  157. new_state = state.copy()
  158. new_state["errors"].append(f"规划节点错误: {str(e)}")
  159. return convert_numpy_types(new_state)
  160. async def _outline_generator_node(self, state: IntegratedWorkflowState) -> IntegratedWorkflowState:
  161. """大纲生成节点"""
  162. try:
  163. print("📝 正在生成报告大纲...")
  164. # 生成大纲(支持重试机制)
  165. outline = await generate_report_outline(
  166. question=state["question"],
  167. industry=state["industry"],
  168. sample_data=state["data_set"][:3], # 使用前3个样本
  169. api_key=self.api_key,
  170. max_retries=1, # 最多重试5次
  171. retry_delay=3.0 # 每次重试间隔3秒
  172. )
  173. # 更新状态
  174. new_state = update_state_with_outline_generation(state, outline)
  175. print(f"✅ 大纲生成完成:{outline.report_title}")
  176. print(f" 包含 {len(outline.sections)} 个章节,{len(outline.global_metrics)} 个指标需求")
  177. return convert_numpy_types(new_state)
  178. except Exception as e:
  179. print(f"❌ 大纲生成失败: {e}")
  180. new_state = state.copy()
  181. new_state["errors"].append(f"大纲生成错误: {str(e)}")
  182. return convert_numpy_types(new_state)
  183. async def _metric_evaluator_node(self, state: IntegratedWorkflowState) -> IntegratedWorkflowState:
  184. """指标评估节点:根据大纲确定需要计算的指标"""
  185. try:
  186. print("🔍 正在评估指标需求...")
  187. new_state = state.copy()
  188. outline = state.get("outline_draft")
  189. if not outline:
  190. print("⚠️ 没有大纲信息,跳过指标评估")
  191. return convert_numpy_types(new_state)
  192. # 从大纲中提取指标需求
  193. metrics_requirements = outline.global_metrics
  194. metric_ids = [m.metric_id for m in metrics_requirements]
  195. # 设置待计算指标
  196. new_state["metrics_requirements"] = metrics_requirements
  197. new_state["pending_metric_ids"] = metric_ids.copy()
  198. new_state["computed_metrics"] = {}
  199. new_state["metrics_cache"] = {}
  200. print(f"✅ 指标评估完成,发现 {len(metric_ids)} 个待计算指标")
  201. for i, metric_id in enumerate(metric_ids[:5], 1): # 只显示前5个
  202. print(f" {i}. {metric_id}")
  203. if len(metric_ids) > 5:
  204. print(f" ... 还有 {len(metric_ids) - 5} 个指标")
  205. # 添加消息
  206. new_state["messages"].append({
  207. "role": "assistant",
  208. "content": f"🔍 指标评估完成:发现 {len(metric_ids)} 个待计算指标",
  209. "timestamp": datetime.now().isoformat()
  210. })
  211. return convert_numpy_types(new_state)
  212. except Exception as e:
  213. print(f"❌ 指标评估失败: {e}")
  214. new_state = state.copy()
  215. new_state["errors"].append(f"指标评估错误: {str(e)}")
  216. return convert_numpy_types(new_state)
  217. async def _metric_calculator_node(self, state: IntegratedWorkflowState) -> IntegratedWorkflowState:
  218. """指标计算节点"""
  219. try:
  220. # 检查计算模式
  221. use_rules_engine_only = state.get("use_rules_engine_only", False)
  222. use_traditional_engine_only = state.get("use_traditional_engine_only", False)
  223. if use_rules_engine_only:
  224. print("🧮 正在执行规则引擎指标计算(专用模式)...")
  225. elif use_traditional_engine_only:
  226. print("🧮 正在执行传统引擎指标计算(专用模式)...")
  227. else:
  228. print("🧮 正在执行指标计算...")
  229. new_state = state.copy()
  230. pending_ids = state.get("pending_metric_ids", [])
  231. if not pending_ids:
  232. print("⚠️ 没有待计算的指标")
  233. return convert_numpy_types(new_state)
  234. # 获取指标需求信息
  235. metrics_requirements = state.get("metrics_requirements", [])
  236. if not metrics_requirements:
  237. print("⚠️ 没有指标需求信息")
  238. return convert_numpy_types(new_state)
  239. # 计算成功和失败的指标
  240. successful_calculations = 0
  241. failed_calculations = 0
  242. # 遍历待计算的指标(创建副本避免修改时遍历的问题)
  243. for metric_id in pending_ids.copy():
  244. try:
  245. # 找到对应的指标需求
  246. metric_req = next((m for m in metrics_requirements if m.metric_id == metric_id), None)
  247. if not metric_req:
  248. print(f"⚠️ 找不到指标 {metric_id} 的需求信息,跳过")
  249. # 仍然从待计算列表中移除,避免无限循环
  250. if metric_id in new_state["pending_metric_ids"]:
  251. new_state["pending_metric_ids"].remove(metric_id)
  252. continue
  253. print(f"🧮 计算指标: {metric_id} - {metric_req.metric_name}")
  254. # 根据模式决定使用哪种计算方式
  255. if use_rules_engine_only:
  256. # 只使用规则引擎计算
  257. use_rules_engine = True
  258. print(f" 使用规则引擎模式")
  259. elif use_traditional_engine_only:
  260. # 只使用传统引擎计算
  261. use_rules_engine = False
  262. print(f" 使用传统引擎模式")
  263. else:
  264. # 自动选择计算方式:优先使用规则引擎,只在规则引擎不可用时使用传统计算
  265. use_rules_engine = True # 默认使用规则引擎计算所有指标
  266. if use_rules_engine:
  267. # 使用规则引擎计算
  268. # 现在metric_id已经是知识ID,直接使用它作为配置名
  269. config_name = metric_id # metric_id 已经是知识ID,如 "metric-分析账户数量"
  270. intent_result = {
  271. "target_configs": [config_name],
  272. "intent_category": "指标计算"
  273. }
  274. print(f" 使用知识ID: {config_name}")
  275. results = await self.rules_engine_agent.calculate_metrics(intent_result)
  276. else:
  277. # 使用传统指标计算(模拟)
  278. # 这里简化处理,实际应该根据配置文件调用相应的API
  279. results = {
  280. "success": True,
  281. "results": [{
  282. "config_name": metric_req.metric_id,
  283. "result": {
  284. "success": True,
  285. "data": f"传统引擎计算结果:{metric_req.metric_name}",
  286. "value": 100.0 # 模拟数值
  287. }
  288. }]
  289. }
  290. # 处理计算结果
  291. for result in results.get("results", []):
  292. if result.get("result", {}).get("success"):
  293. # 计算成功
  294. new_state["computed_metrics"][metric_id] = result["result"]
  295. successful_calculations += 1
  296. print(f"✅ 指标 {metric_id} 计算成功")
  297. else:
  298. # 计算失败
  299. failed_calculations += 1
  300. print(f"❌ 指标 {metric_id} 计算失败")
  301. # 从待计算列表中移除(无论成功还是失败)
  302. if metric_id in new_state["pending_metric_ids"]:
  303. new_state["pending_metric_ids"].remove(metric_id)
  304. except Exception as e:
  305. print(f"❌ 计算指标 {metric_id} 时发生异常: {e}")
  306. failed_calculations += 1
  307. # 即使异常,也要从待计算列表中移除,避免无限循环
  308. if metric_id in new_state["pending_metric_ids"]:
  309. new_state["pending_metric_ids"].remove(metric_id)
  310. # 更新计算结果统计
  311. new_state["calculation_results"] = {
  312. "total_configs": len(pending_ids),
  313. "successful_calculations": successful_calculations,
  314. "failed_calculations": failed_calculations
  315. }
  316. # 添加消息
  317. if use_rules_engine_only:
  318. message_content = f"🧮 规则引擎指标计算完成:{successful_calculations} 成功,{failed_calculations} 失败"
  319. elif use_traditional_engine_only:
  320. message_content = f"🧮 传统引擎指标计算完成:{successful_calculations} 成功,{failed_calculations} 失败"
  321. else:
  322. message_content = f"🧮 指标计算完成:{successful_calculations} 成功,{failed_calculations} 失败"
  323. new_state["messages"].append({
  324. "role": "assistant",
  325. "content": message_content,
  326. "timestamp": datetime.now().isoformat()
  327. })
  328. if use_rules_engine_only:
  329. print(f"✅ 规则引擎指标计算完成:{successful_calculations} 成功,{failed_calculations} 失败")
  330. elif use_traditional_engine_only:
  331. print(f"✅ 传统引擎指标计算完成:{successful_calculations} 成功,{failed_calculations} 失败")
  332. else:
  333. print(f"✅ 指标计算完成:{successful_calculations} 成功,{failed_calculations} 失败")
  334. return convert_numpy_types(new_state)
  335. except Exception as e:
  336. print(f"❌ 指标计算节点失败: {e}")
  337. new_state = state.copy()
  338. new_state["errors"].append(f"指标计算错误: {str(e)}")
  339. return convert_numpy_types(new_state)
  340. async def _report_finalizer_node(self, state: IntegratedWorkflowState) -> IntegratedWorkflowState:
  341. """报告完成节点:生成最终报告"""
  342. try:
  343. print("📋 正在生成最终报告...")
  344. # 获取大纲和计算结果
  345. outline = state.get("outline_draft")
  346. computed_metrics = state.get("computed_metrics", {})
  347. if not outline:
  348. raise ValueError("没有可用的报告大纲")
  349. # 生成最终报告
  350. final_report = {
  351. "title": outline.report_title,
  352. "generated_at": datetime.now().isoformat(),
  353. "summary": {
  354. "total_sections": len(outline.sections),
  355. "total_metrics_required": len(outline.global_metrics),
  356. "total_metrics_computed": len(computed_metrics),
  357. "planning_steps": state.get("planning_step", 0),
  358. "completion_rate": len(computed_metrics) / len(outline.global_metrics) if outline.global_metrics else 0
  359. },
  360. "sections": [],
  361. "metrics_detail": {}
  362. }
  363. # 构建章节内容
  364. for section in outline.sections:
  365. section_content = {
  366. "section_id": section.section_id,
  367. "title": section.title,
  368. "description": section.description,
  369. "metrics": {}
  370. }
  371. # 添加该章节的指标数据
  372. for metric_id in section.metrics_needed:
  373. if metric_id in computed_metrics:
  374. section_content["metrics"][metric_id] = computed_metrics[metric_id]
  375. else:
  376. section_content["metrics"][metric_id] = "数据缺失"
  377. final_report["sections"].append(section_content)
  378. # 添加详细的指标信息
  379. for metric_req in outline.global_metrics:
  380. metric_id = metric_req.metric_id
  381. final_report["metrics_detail"][metric_id] = {
  382. "name": metric_req.metric_name,
  383. "logic": metric_req.calculation_logic,
  384. "required_fields": metric_req.required_fields,
  385. "computed": metric_id in computed_metrics,
  386. "value": computed_metrics.get(metric_id, {}).get("value", "N/A")
  387. }
  388. # 更新状态
  389. new_state = finalize_state_with_report(state, final_report)
  390. # 添加完成消息
  391. new_state["messages"].append({
  392. "role": "assistant",
  393. "content": f"🎉 完整报告生成流程完成:{outline.report_title}",
  394. "timestamp": datetime.now().isoformat()
  395. })
  396. print(f"✅ 最终报告生成完成:{outline.report_title}")
  397. print(f" 章节数:{len(final_report['sections'])}")
  398. print(f" 计算指标:{len(computed_metrics)}/{len(outline.global_metrics)}")
  399. print(".2%")
  400. return convert_numpy_types(new_state)
  401. except Exception as e:
  402. print(f"❌ 报告完成失败: {e}")
  403. new_state = state.copy()
  404. new_state["errors"].append(f"报告完成错误: {str(e)}")
  405. return convert_numpy_types(new_state)
  406. def _decision_to_route(self, decision: str) -> str:
  407. """将规划决策转换为路由"""
  408. decision_routes = {
  409. "generate_outline": "outline_generator",
  410. "compute_metrics": "metric_calculator",
  411. "finalize_report": "report_finalizer"
  412. }
  413. return decision_routes.get(decision, "planning_node")
  414. def _format_decision_message(self, decision: Any) -> str:
  415. """格式化决策消息"""
  416. try:
  417. decision_type = getattr(decision, 'decision', 'unknown')
  418. reasoning = getattr(decision, 'reasoning', '')
  419. if decision_type == "compute_metrics" and hasattr(decision, 'metrics_to_compute'):
  420. metrics = decision.metrics_to_compute
  421. return f"🧮 规划决策:计算 {len(metrics)} 个指标"
  422. elif decision_type == "finalize_report":
  423. return f"✅ 规划决策:生成最终报告"
  424. elif decision_type == "generate_outline":
  425. return f"📋 规划决策:生成大纲"
  426. else:
  427. return f"🤔 规划决策:{decision_type}"
  428. except:
  429. return "🤔 规划决策已完成"
  430. async def run_workflow(self, question: str, industry: str, data: List[Dict[str, Any]], session_id: str = None, use_rules_engine_only: bool = False, use_traditional_engine_only: bool = False) -> Dict[str, Any]:
  431. """
  432. 运行完整的工作流
  433. Args:
  434. question: 用户查询
  435. industry: 行业
  436. data: 数据集
  437. session_id: 会话ID
  438. use_rules_engine_only: 是否只使用规则引擎指标计算
  439. use_traditional_engine_only: 是否只使用传统引擎指标计算
  440. Returns:
  441. 工作流结果
  442. """
  443. try:
  444. print("🚀 启动完整智能体工作流...")
  445. print(f"问题:{question}")
  446. print(f"行业:{industry}")
  447. print(f"数据条数:{len(data)}")
  448. if use_rules_engine_only:
  449. print("计算模式:只使用规则引擎")
  450. elif use_traditional_engine_only:
  451. print("计算模式:只使用传统引擎")
  452. else:
  453. print("计算模式:标准模式")
  454. # 创建初始状态
  455. initial_state = create_initial_integrated_state(question, industry, data, session_id)
  456. # 设置计算模式标记
  457. if use_rules_engine_only:
  458. initial_state["use_rules_engine_only"] = True
  459. initial_state["use_traditional_engine_only"] = False
  460. elif use_traditional_engine_only:
  461. initial_state["use_rules_engine_only"] = False
  462. initial_state["use_traditional_engine_only"] = True
  463. else:
  464. initial_state["use_rules_engine_only"] = False
  465. initial_state["use_traditional_engine_only"] = False
  466. # 编译工作流
  467. app = self.workflow.compile()
  468. # 执行工作流
  469. result = await app.ainvoke(initial_state)
  470. print("✅ 工作流执行完成")
  471. return {
  472. "success": True,
  473. "result": result,
  474. "answer": result.get("answer"),
  475. "report": result.get("report_draft"),
  476. "session_id": result.get("session_id"),
  477. "execution_summary": {
  478. "planning_steps": result.get("planning_step", 0),
  479. "outline_generated": result.get("outline_draft") is not None,
  480. "metrics_computed": len(result.get("computed_metrics", {})),
  481. "completion_rate": result.get("completeness_score", 0)
  482. }
  483. }
  484. except Exception as e:
  485. print(f"❌ 工作流执行失败: {e}")
  486. return {
  487. "success": False,
  488. "error": str(e),
  489. "result": None
  490. }
  491. # 便捷函数
  492. async def run_complete_agent_flow(question: str, industry: str, data: List[Dict[str, Any]], api_key: str, session_id: str = None, use_rules_engine_only: bool = False, use_traditional_engine_only: bool = False) -> Dict[str, Any]:
  493. """
  494. 运行完整智能体工作流的便捷函数
  495. Args:
  496. question: 用户查询
  497. data: 数据集
  498. api_key: API密钥
  499. session_id: 会话ID
  500. use_rules_engine_only: 是否只使用规则引擎指标计算
  501. use_traditional_engine_only: 是否只使用传统引擎指标计算
  502. Returns:
  503. 工作流结果
  504. """
  505. workflow = CompleteAgentFlow(api_key)
  506. return await workflow.run_workflow(question, industry, data, session_id, use_rules_engine_only, use_traditional_engine_only)
  507. # 主函数用于测试
  508. async def main():
  509. """主函数:执行系统测试"""
  510. print("🚀 执行CompleteAgentFlow系统测试")
  511. print("=" * 50)
  512. # 导入配置
  513. import config
  514. if not config.DEEPSEEK_API_KEY:
  515. print("❌ 未找到API密钥")
  516. return
  517. # 测试数据
  518. test_data = [
  519. {
  520. }
  521. ]
  522. print(f"📊 测试数据: {len(test_data)} 条记录")
  523. # 执行测试
  524. result = await run_complete_agent_flow(
  525. question="请生成一份详细的农业经营贷流水分析报告,需要包含:1.总收入和总支出统计 2.收入笔数和支出笔数 3.各类型收入支出占比分析 4.交易对手收入支出TOP3排名 5.按月份的收入支出趋势分析 6.账户数量和交易时间范围统计 7.资金流入流出月度统计等全面指标",
  526. industry = "农业",
  527. data=test_data,
  528. api_key=config.DEEPSEEK_API_KEY,
  529. session_id="direct-test"
  530. )
  531. print(f"📋 结果: {'✅ 成功' if result.get('success') else '❌ 失败'}")
  532. if result.get('success'):
  533. summary = result.get('execution_summary', {})
  534. print(f" 规划步骤: {summary.get('planning_steps', 0)}")
  535. print(f" 指标计算: {summary.get('metrics_computed', 0)}")
  536. print("🎉 测试成功!")
  537. if __name__ == "__main__":
  538. import asyncio
  539. asyncio.run(main())