complete_agent_flow_rule.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626
  1. """
  2. 完整的智能体工作流 (Complete Agent Flow)
  3. =====================================
  4. 此工作流整合了规划、大纲生成和指标计算四个核心智能体,实现完整的报告生成流程。
  5. 包含的智能体:
  6. 1. PlanningAgent (规划智能体) - 分析状态并做出决策
  7. 2. OutlineAgent (大纲生成智能体) - 生成报告结构和指标需求
  8. 3. MetricCalculationAgent (指标计算智能体) - 执行标准指标计算
  9. 4. RulesEngineMetricCalculationAgent (规则引擎指标计算智能体) - 执行规则引擎指标计算
  10. 工作流程:
  11. 1. 规划节点 → 分析当前状态,决定下一步行动
  12. 2. 大纲生成节点 → 生成报告大纲和指标需求
  13. 3. 指标判断节点 → 根据大纲确定需要计算的指标
  14. 4. 指标计算节点 → 执行具体的指标计算任务
  15. 技术特点:
  16. - 基于LangGraph的状态机工作流
  17. - 支持条件路由和状态管理
  18. - 完善的错误处理机制
  19. - 详细的执行日志记录
  20. 作者: Big Agent Team
  21. 版本: 1.0.0
  22. 创建时间: 2024-12-20
  23. """
  24. import asyncio
  25. from typing import Dict, Any, List
  26. from datetime import datetime
  27. from langgraph.graph import StateGraph, START, END
  28. from .workflow_state import (
  29. IntegratedWorkflowState,
  30. create_initial_integrated_state,
  31. get_calculation_progress,
  32. update_state_with_outline_generation,
  33. update_state_with_planning_decision,
  34. update_state_with_data_classified,
  35. convert_numpy_types,
  36. )
  37. from .agents.outline_agent import generate_report_outline
  38. from .agents.planning_agent import plan_next_action
  39. from .agents.rules_engine_metric_calculation_agent import RulesEngineMetricCalculationAgent
  40. from .agents.data_manager import DataManager
  41. import os
  42. from .agents.data_classify_agent import data_classify
  43. class CompleteAgentFlow:
  44. """完整的智能体工作流"""
  45. def __init__(self, api_key: str, base_url: str = "https://api.deepseek.com", run_id: str = None):
  46. """
  47. 初始化完整的工作流
  48. Args:
  49. api_key: DeepSeek API密钥
  50. base_url: DeepSeek API基础URL
  51. run_id: 运行ID,用于区分不同的运行实例(可选)
  52. """
  53. self.api_key = api_key
  54. self.base_url = base_url
  55. self.run_id = run_id or "default"
  56. # 设置环境变量,让所有agent使用统一的文件夹
  57. os.environ['FLOW_RUN_ID'] = self.run_id
  58. # 初始规则引擎智能体
  59. self.rules_engine_agent = RulesEngineMetricCalculationAgent(api_key, base_url)
  60. # 创建工作流图
  61. self.workflow = self._create_workflow()
  62. def _create_workflow(self) -> StateGraph:
  63. """创建LangGraph工作流"""
  64. workflow = StateGraph(IntegratedWorkflowState)
  65. # 添加节点
  66. workflow.add_node("planning_node", self._planning_node)
  67. workflow.add_node("outline_generator", self._outline_generator_node)
  68. workflow.add_node("metric_calculator", self._metric_calculator_node)
  69. workflow.add_node("data_classify", self._data_classify_node)
  70. # 设置入口点
  71. workflow.set_entry_point("planning_node")
  72. # 添加条件边 - 基于规划决策路由
  73. workflow.add_conditional_edges(
  74. "planning_node",
  75. self._route_from_planning,
  76. {
  77. "outline_generator": "outline_generator",
  78. "metric_calculator": "metric_calculator",
  79. "data_classify": "data_classify",
  80. END: END
  81. }
  82. )
  83. # 从各个节点返回规划节点重新决策
  84. workflow.add_edge("data_classify", "planning_node")
  85. workflow.add_edge("outline_generator", "planning_node")
  86. workflow.add_edge("metric_calculator", "planning_node")
  87. return workflow
  88. def _route_from_planning(self, state: IntegratedWorkflowState) -> str:
  89. """
  90. 从规划节点路由到下一个节点
  91. Args:
  92. state: 当前状态
  93. Returns:
  94. 目标节点名称
  95. """
  96. print(f"\n🔍 [路由决策] 步骤={state['planning_step']}, "
  97. f"数据集分类打标数量={len(state.get('data_set_classified', []))}",
  98. f"大纲={state.get('outline_draft') is not None}, "
  99. f"指标需求={len(state.get('metrics_requirements', []))}")
  100. # 防止无限循环
  101. if state['planning_step'] > 30:
  102. print("⚠️ 规划步骤超过30次,强制结束流程")
  103. return END
  104. # 数据分类打标数量为0 → 分类打标
  105. if len(state.get("data_set_classified", [])) == 0:
  106. print("→ 路由到 data_classify(分类打标)")
  107. return "data_classify"
  108. # 如果大纲为空 → 生成大纲
  109. if not state.get("outline_draft"):
  110. print("→ 路由到 outline_generator(生成大纲)")
  111. return "outline_generator"
  112. # 如果指标需求为空但大纲已生成 → 评估指标需求
  113. if not state.get("metrics_requirements") and state.get("outline_draft"):
  114. print("→ 路由到 metric_evaluator(评估指标需求)")
  115. return "metric_evaluator"
  116. # 计算覆盖率
  117. progress = get_calculation_progress(state)
  118. coverage = progress["coverage_rate"]
  119. print(f" 指标覆盖率 = {coverage:.2%}")
  120. # 如果有待计算指标且覆盖率 < 100% → 计算指标
  121. if state.get("pending_metric_ids") and coverage < 1.0:
  122. print(f"→ 路由到 metric_calculator(计算指标,覆盖率={coverage:.2%})")
  123. return "metric_calculator"
  124. # 检查是否应该结束流程
  125. pending_ids = state.get("pending_metric_ids", [])
  126. failed_attempts = state.get("failed_metric_attempts", {})
  127. max_retries = 3
  128. # 计算还有哪些指标可以重试(未达到最大重试次数)
  129. retryable_metrics = [
  130. mid for mid in pending_ids
  131. if failed_attempts.get(mid, 0) < max_retries
  132. ]
  133. # 如果覆盖率 >= 80%,或者没有可重试的指标 → 结束流程
  134. if coverage >= 0.8 or not retryable_metrics:
  135. reason = "覆盖率达到80%" if coverage >= 0.8 else "没有可重试指标"
  136. print(f"→ 结束流程(覆盖率={coverage:.2%},原因:{reason})")
  137. return END
  138. # 默认返回规划节点
  139. return "planning_node"
  140. async def _planning_node(self, state: IntegratedWorkflowState) -> IntegratedWorkflowState:
  141. """规划节点:分析状态并做出决策"""
  142. try:
  143. print("🧠 正在执行规划分析...")
  144. # 使用规划智能体做出决策
  145. decision = await plan_next_action(
  146. question=state["question"],
  147. industry=state["industry"],
  148. current_state=state,
  149. api_key=self.api_key
  150. )
  151. # 更新状态
  152. new_state = update_state_with_planning_decision(state, {
  153. "decision": decision.decision,
  154. "next_route": self._decision_to_route(decision.decision),
  155. "metrics_to_compute": decision.metrics_to_compute
  156. })
  157. # 添加决策消息
  158. decision_msg = self._format_decision_message(decision)
  159. new_state["messages"].append({
  160. "role": "assistant",
  161. "content": decision_msg,
  162. "timestamp": datetime.now().isoformat()
  163. })
  164. print(f"✅ 规划决策完成:{decision.decision}")
  165. return convert_numpy_types(new_state)
  166. except Exception as e:
  167. print(f"❌ 规划节点执行失败: {e}")
  168. new_state = state.copy()
  169. new_state["errors"].append(f"规划节点错误: {str(e)}")
  170. return convert_numpy_types(new_state)
  171. async def _outline_generator_node(self, state: IntegratedWorkflowState) -> IntegratedWorkflowState:
  172. """大纲生成节点"""
  173. try:
  174. print("📝 正在生成报告大纲...")
  175. # 生成大纲(支持重试机制)
  176. outline = await generate_report_outline(
  177. question=state["question"],
  178. industry=state["industry"],
  179. sample_data=state["data_set"][:3], # 使用前3个样本
  180. api_key=self.api_key,
  181. max_retries=3, # 最多重试5次
  182. retry_delay=3.0 # 每次重试间隔3秒
  183. )
  184. # 更新状态
  185. new_state = update_state_with_outline_generation(state, outline)
  186. print(f"✅ 大纲生成完成:{outline.report_title}")
  187. print(f" 包含 {len(outline.sections)} 个章节,{len(outline.global_metrics)} 个指标需求")
  188. # 分析并打印AI的指标选择推理过程
  189. self._print_ai_selection_analysis(outline)
  190. return convert_numpy_types(new_state)
  191. except Exception as e:
  192. print(f"❌ 大纲生成失败: {e}")
  193. new_state = state.copy()
  194. new_state["errors"].append(f"大纲生成错误: {str(e)}")
  195. return convert_numpy_types(new_state)
  196. async def _data_classify_node(self, state: IntegratedWorkflowState) -> IntegratedWorkflowState:
  197. """数据分类打标节点"""
  198. try:
  199. print("📝 正在对数据进行分类打标...")
  200. # 对数据进行分类打标
  201. data_set_classified = await data_classify(
  202. industry=state["industry"],
  203. data_set=state["data_set"],
  204. file_name=state["file_name"]
  205. )
  206. # 更新状态
  207. new_state = update_state_with_data_classified(state, data_set_classified)
  208. print(f"✅ 数据分类打标完成,打标记录数: {len(data_set_classified)}")
  209. return convert_numpy_types(new_state)
  210. except Exception as e:
  211. print(f"❌ 数据分类打标失败: {e}")
  212. new_state = state.copy()
  213. new_state["errors"].append(f"数据分类打标错误: {str(e)}")
  214. return convert_numpy_types(new_state)
  215. def _print_ai_selection_analysis(self, outline):
  216. """打印AI指标选择的推理过程分析 - 完全通用版本"""
  217. print()
  218. print('╔══════════════════════════════════════════════════════════════════════════════╗')
  219. print('║ 🤖 AI指标选择分析 ║')
  220. print('╚══════════════════════════════════════════════════════════════════════════════╝')
  221. print()
  222. # 计算总指标数 - outline可能是字典格式,需要适配
  223. if hasattr(outline, 'sections'):
  224. # Pydantic模型格式
  225. total_metrics = sum(len(section.metrics_needed) for section in outline.sections)
  226. sections = outline.sections
  227. else:
  228. # 字典格式
  229. total_metrics = sum(len(section.get('metrics_needed', [])) for section in outline.get('sections', []))
  230. sections = outline.get('sections', [])
  231. # 获取可用指标总数(这里可以从状态或其他地方动态获取)
  232. available_count = 26 # 这个可以从API调用中动态获取
  233. print('📊 选择统计:')
  234. print(' ┌─────────────────────────────────────────────────────────────────────┐')
  235. print(' │ 系统可用指标: {}个 │ AI本次选择: {}个 │ 选择率: {:.1f}% │'.format(
  236. available_count, total_metrics, total_metrics/available_count*100 if available_count > 0 else 0))
  237. print(' └─────────────────────────────────────────────────────────────────────┘')
  238. print()
  239. print('📋 AI决策过程:')
  240. print(' 大模型已根据用户需求从{}个可用指标中选择了{}个最相关的指标。'.format(available_count, total_metrics))
  241. print(' 选择过程完全由大模型基于语义理解和业务逻辑进行,不涉及任何硬编码规则。')
  242. print()
  243. print('🔍 选择结果:')
  244. print(' • 总章节数: {}个'.format(len(sections)))
  245. print(' • 平均每章节指标数: {:.1f}个'.format(total_metrics/len(sections) if sections else 0))
  246. print(' • 选择策略: 基于用户需求的相关性分析')
  247. print()
  248. print('🎯 AI Agent核心能力:')
  249. print(' • 语义理解: 理解用户查询的业务意图和分析需求')
  250. print(' • 智能筛选: 从海量指标中挑选最相关的组合')
  251. print(' • 逻辑推理: 为每个分析维度提供充分的选择依据')
  252. print(' • 动态适配: 根据不同场景自动调整选择策略')
  253. print()
  254. print('💡 关键洞察:')
  255. print(' AI Agent通过大模型的推理能力,实现了超越传统规则引擎的智能化指标选择,')
  256. print(' 能够根据具体业务场景动态调整分析框架,确保分析的针对性和有效性。')
  257. print()
  258. async def _metric_calculator_node(self, state: IntegratedWorkflowState) -> IntegratedWorkflowState:
  259. """指标计算节点"""
  260. try:
  261. # 检查计算模式
  262. use_rules_engine_only = state.get("use_rules_engine_only", False)
  263. use_traditional_engine_only = state.get("use_traditional_engine_only", False)
  264. if use_rules_engine_only:
  265. print("🧮 正在执行规则引擎指标计算(专用模式)...")
  266. elif use_traditional_engine_only:
  267. print("🧮 正在执行传统引擎指标计算(专用模式)...")
  268. else:
  269. print("🧮 正在执行指标计算...")
  270. new_state = state.copy()
  271. # 使用规划决策指定的指标批次,如果没有指定则使用所有待计算指标
  272. current_batch = state.get("current_batch_metrics", [])
  273. if current_batch:
  274. pending_ids = current_batch
  275. print(f"🧮 本次计算批次包含 {len(pending_ids)} 个指标")
  276. else:
  277. pending_ids = state.get("pending_metric_ids", [])
  278. print(f"🧮 计算所有待计算指标,共 {len(pending_ids)} 个")
  279. if not pending_ids:
  280. print("⚠️ 没有待计算的指标")
  281. return convert_numpy_types(new_state)
  282. # 获取指标需求信息
  283. metrics_requirements = state.get("metrics_requirements", [])
  284. if not metrics_requirements:
  285. print("⚠️ 没有指标需求信息")
  286. return convert_numpy_types(new_state)
  287. # 计算成功和失败的指标
  288. successful_calculations = 0
  289. failed_calculations = 0
  290. # 遍历待计算的指标(创建副本避免修改时遍历的问题)
  291. for metric_id in pending_ids.copy():
  292. try:
  293. # 找到对应的指标需求
  294. metric_req = next((m for m in metrics_requirements if m.metric_id == metric_id), None)
  295. if not metric_req:
  296. # 修复:找不到指标需求时,创建临时的指标需求结构,避免跳过指标
  297. print(f"⚠️ 指标 {metric_id} 找不到需求信息,创建临时配置继续计算")
  298. metric_req = type('MetricRequirement', (), {
  299. 'metric_id': metric_id,
  300. 'metric_name': metric_id.replace('metric-', '') if metric_id.startswith('metric-') else metric_id,
  301. 'calculation_logic': f'计算 {metric_id}',
  302. 'required_fields': ['transactions'],
  303. 'dependencies': []
  304. })()
  305. print(f"🧮 计算指标: {metric_id} - {metric_req.metric_name}")
  306. # 根据模式决定使用哪种计算方式
  307. if use_rules_engine_only:
  308. # 只使用规则引擎计算
  309. use_rules_engine = True
  310. print(f" 使用规则引擎模式")
  311. elif use_traditional_engine_only:
  312. # 只使用传统引擎计算
  313. use_rules_engine = False
  314. print(f" 使用传统引擎模式")
  315. else:
  316. # 自动选择计算方式:优先使用规则引擎,只在规则引擎不可用时使用传统计算
  317. use_rules_engine = True # 默认使用规则引擎计算所有指标
  318. if use_rules_engine:
  319. # 使用规则引擎计算
  320. # 现在metric_id已经是知识ID,直接使用它作为配置名
  321. config_name = metric_id # metric_id 已经是知识ID,如 "metric-分析账户数量"
  322. intent_result = {
  323. "target_configs": [config_name],
  324. "intent_category": "指标计算"
  325. }
  326. print(f" 使用知识ID: {config_name}")
  327. # 将打好标的数据集传入指标计算函数中
  328. data_set_classified = state.get("data_set_classified", [])
  329. results = await self.rules_engine_agent.calculate_metrics(intent_result, data_set_classified)
  330. else:
  331. # 使用传统指标计算(模拟)
  332. # 这里简化处理,实际应该根据配置文件调用相应的API
  333. results = {
  334. "success": True,
  335. "results": [{
  336. "config_name": metric_req.metric_id,
  337. "result": {
  338. "success": True,
  339. "data": f"传统引擎计算结果:{metric_req.metric_name}",
  340. "value": 100.0 # 模拟数值
  341. }
  342. }]
  343. }
  344. # 处理计算结果
  345. calculation_success = False
  346. for result in results.get("results", []):
  347. if result.get("result", {}).get("success"):
  348. # 计算成功
  349. new_state["computed_metrics"][metric_id] = result["result"]
  350. successful_calculations += 1
  351. calculation_success = True
  352. print(f"✅ 指标 {metric_id} 计算成功")
  353. break # 找到一个成功的就算成功
  354. else:
  355. # 计算失败
  356. failed_calculations += 1
  357. print(f"❌ 指标 {metric_id} 计算失败")
  358. # 初始化失败尝试记录
  359. if "failed_metric_attempts" not in new_state:
  360. new_state["failed_metric_attempts"] = {}
  361. # 根据计算结果处理指标
  362. if calculation_success:
  363. # 计算成功:从待计算列表中移除
  364. if metric_id in new_state["pending_metric_ids"]:
  365. new_state["pending_metric_ids"].remove(metric_id)
  366. # 重置失败计数
  367. new_state["failed_metric_attempts"].pop(metric_id, None)
  368. else:
  369. # 计算失败:记录失败次数,不从待计算列表移除
  370. new_state["failed_metric_attempts"][metric_id] = new_state["failed_metric_attempts"].get(metric_id, 0) + 1
  371. max_retries = 3
  372. if new_state["failed_metric_attempts"][metric_id] >= max_retries:
  373. print(f"⚠️ 指标 {metric_id} 已达到最大重试次数 ({max_retries}),从待计算列表中移除")
  374. if metric_id in new_state["pending_metric_ids"]:
  375. new_state["pending_metric_ids"].remove(metric_id)
  376. except Exception as e:
  377. print(f"❌ 计算指标 {metric_id} 时发生异常: {e}")
  378. failed_calculations += 1
  379. # 初始化失败尝试记录
  380. if "failed_metric_attempts" not in new_state:
  381. new_state["failed_metric_attempts"] = {}
  382. # 记录失败次数
  383. new_state["failed_metric_attempts"][metric_id] = new_state["failed_metric_attempts"].get(metric_id, 0) + 1
  384. max_retries = 3
  385. if new_state["failed_metric_attempts"][metric_id] >= max_retries:
  386. print(f"⚠️ 指标 {metric_id} 异常已达到最大重试次数 ({max_retries}),从待计算列表中移除")
  387. if metric_id in new_state["pending_metric_ids"]:
  388. new_state["pending_metric_ids"].remove(metric_id)
  389. # 更新计算结果统计
  390. new_state["calculation_results"] = {
  391. "total_configs": len(pending_ids),
  392. "successful_calculations": successful_calculations,
  393. "failed_calculations": failed_calculations
  394. }
  395. # 添加消息
  396. if use_rules_engine_only:
  397. message_content = f"🧮 规则引擎指标计算完成:{successful_calculations} 成功,{failed_calculations} 失败"
  398. elif use_traditional_engine_only:
  399. message_content = f"🧮 传统引擎指标计算完成:{successful_calculations} 成功,{failed_calculations} 失败"
  400. else:
  401. message_content = f"🧮 指标计算完成:{successful_calculations} 成功,{failed_calculations} 失败"
  402. new_state["messages"].append({
  403. "role": "assistant",
  404. "content": message_content,
  405. "timestamp": datetime.now().isoformat()
  406. })
  407. if use_rules_engine_only:
  408. print(f"✅ 规则引擎指标计算完成:{successful_calculations} 成功,{failed_calculations} 失败")
  409. elif use_traditional_engine_only:
  410. print(f"✅ 传统引擎指标计算完成:{successful_calculations} 成功,{failed_calculations} 失败")
  411. else:
  412. print(f"✅ 指标计算完成:{successful_calculations} 成功,{failed_calculations} 失败")
  413. return convert_numpy_types(new_state)
  414. except Exception as e:
  415. print(f"❌ 指标计算节点失败: {e}")
  416. new_state = state.copy()
  417. new_state["errors"].append(f"指标计算错误: {str(e)}")
  418. return convert_numpy_types(new_state)
  419. def _decision_to_route(self, decision: str) -> str:
  420. """将规划决策转换为路由"""
  421. decision_routes = {
  422. "data_classify": "data_classify",
  423. "generate_outline": "outline_generator",
  424. "compute_metrics": "metric_calculator",
  425. "finalize_report": END # 直接结束流程
  426. }
  427. return decision_routes.get(decision, "planning_node")
  428. def _format_decision_message(self, decision: Any) -> str:
  429. """格式化决策消息"""
  430. try:
  431. decision_type = getattr(decision, 'decision', 'unknown')
  432. reasoning = getattr(decision, 'reasoning', '')
  433. if decision_type == "compute_metrics" and hasattr(decision, 'metrics_to_compute'):
  434. metrics = decision.metrics_to_compute
  435. return f"🧮 规划决策:计算 {len(metrics)} 个指标"
  436. elif decision_type == "finalize_report":
  437. return f"✅ 规划决策:生成最终报告"
  438. elif decision_type == "generate_outline":
  439. return f"📋 规划决策:生成大纲"
  440. else:
  441. return f"🤔 规划决策:{decision_type}"
  442. except:
  443. return "🤔 规划决策已完成"
  444. async def run_workflow(self, question: str, industry: str, data: List[Dict[str, Any]], file_name: str, session_id: str = None, use_rules_engine_only: bool = False, use_traditional_engine_only: bool = False) -> Dict[str, Any]:
  445. """
  446. 运行完整的工作流
  447. Args:
  448. question: 用户查询
  449. industry: 行业
  450. data: 数据集
  451. file_name: 数据文件名称
  452. session_id: 会话ID
  453. use_rules_engine_only: 是否只使用规则引擎指标计算
  454. use_traditional_engine_only: 是否只使用传统引擎指标计算
  455. Returns:
  456. 工作流结果
  457. """
  458. try:
  459. print("🚀 启动完整智能体工作流...")
  460. print(f"问题:{question}")
  461. print(f"行业:{industry}")
  462. print(f"数据文件:{file_name}")
  463. print(f"数据条数:{len(data)}")
  464. if use_rules_engine_only:
  465. print("计算模式:只使用规则引擎")
  466. elif use_traditional_engine_only:
  467. print("计算模式:只使用传统引擎")
  468. else:
  469. print("计算模式:标准模式")
  470. # 创建初始状态
  471. initial_state = create_initial_integrated_state(question, industry, data, file_name, session_id)
  472. # 设置计算模式标记
  473. if use_rules_engine_only:
  474. initial_state["use_rules_engine_only"] = True
  475. initial_state["use_traditional_engine_only"] = False
  476. elif use_traditional_engine_only:
  477. initial_state["use_rules_engine_only"] = False
  478. initial_state["use_traditional_engine_only"] = True
  479. else:
  480. initial_state["use_rules_engine_only"] = False
  481. initial_state["use_traditional_engine_only"] = False
  482. # 编译工作流
  483. app = self.workflow.compile()
  484. # 执行工作流
  485. result = await app.ainvoke(initial_state)
  486. print("✅ 工作流执行完成")
  487. return {
  488. "success": True,
  489. "result": result,
  490. "answer": result.get("answer"),
  491. "report": result.get("report_draft"),
  492. "session_id": result.get("session_id"),
  493. "execution_summary": {
  494. "planning_steps": result.get("planning_step", 0),
  495. "outline_generated": result.get("outline_draft") is not None,
  496. "metrics_computed": len(result.get("computed_metrics", {})),
  497. "completion_rate": result.get("completeness_score", 0)
  498. }
  499. }
  500. except Exception as e:
  501. print(f"❌ 工作流执行失败: {e}")
  502. return {
  503. "success": False,
  504. "error": str(e),
  505. "result": None
  506. }
  507. # 便捷函数
  508. async def run_complete_agent_flow(question: str, industry: str, data: List[Dict[str, Any]], file_name: str, api_key: str, session_id: str = None, use_rules_engine_only: bool = False, use_traditional_engine_only: bool = False, run_id: str = None) -> Dict[str, Any]:
  509. """
  510. 运行完整智能体工作流的便捷函数
  511. Args:
  512. question: 用户查询
  513. data: 数据集
  514. file_name: 数据文件名称
  515. api_key: API密钥
  516. session_id: 会话ID
  517. use_rules_engine_only: 是否只使用规则引擎指标计算
  518. use_traditional_engine_only: 是否只使用传统引擎指标计算
  519. run_id: 运行ID,用于区分不同的运行实例
  520. Returns:
  521. 工作流结果
  522. """
  523. workflow = CompleteAgentFlow(api_key, run_id=run_id)
  524. return await workflow.run_workflow(question, industry, data, file_name, session_id, use_rules_engine_only, use_traditional_engine_only)