complete_agent_flow_rule.py 44 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985
  1. """
  2. 完整的智能体工作流 (Complete Agent Flow)
  3. =====================================
  4. 此工作流整合了规划、大纲生成和指标计算四个核心智能体,实现完整的报告生成流程。
  5. 包含的智能体:
  6. 1. PlanningAgent (规划智能体) - 分析状态并做出决策
  7. 2. OutlineAgent (大纲生成智能体) - 生成报告结构和指标需求
  8. 3. MetricCalculationAgent (指标计算智能体) - 执行标准指标计算
  9. 4. RulesEngineMetricCalculationAgent (规则引擎指标计算智能体) - 执行规则引擎指标计算
  10. 工作流程:
  11. 1. 规划节点 → 分析当前状态,决定下一步行动
  12. 2. 大纲生成节点 → 生成报告大纲和指标需求
  13. 3. 指标判断节点 → 根据大纲确定需要计算的指标
  14. 4. 指标计算节点 → 执行具体的指标计算任务
  15. 技术特点:
  16. - 基于LangGraph的状态机工作流
  17. - 支持条件路由和状态管理
  18. - 完善的错误处理机制
  19. - 详细的执行日志记录
  20. 作者: Big Agent Team
  21. 版本: 1.0.0
  22. 创建时间: 2024-12-20
  23. """
  24. import asyncio
  25. from typing import Dict, Any, List
  26. from datetime import datetime
  27. from langgraph.graph import StateGraph, END
  28. from llmops.workflow_state import (
  29. IntegratedWorkflowState,
  30. create_initial_integrated_state,
  31. get_calculation_progress,
  32. update_state_with_outline_generation,
  33. update_state_with_planning_decision,
  34. update_state_with_data_classified,
  35. convert_numpy_types,
  36. update_state_with_data_standardize,
  37. update_state_with_report,
  38. update_state_with_anomaly_recognition
  39. )
  40. from llmops.agents.outline_agent import generate_report_outline
  41. from llmops.agents.planning_agent import plan_next_action
  42. from llmops.agents.rules_engine_metric_calculation_agent import RulesEngineMetricCalculationAgent
  43. from llmops.agents.anomaly_recognizer_agent import AnomalyRecognitionAgent
  44. from llmops.agents.data_manager import DataManager
  45. import os
  46. from llmops.agents.data_classify_agent import data_classify
  47. from llmops.config import multimodal_api_url, LLM_API_KEY, LLM_BASE_URL, LLM_MODEL_NAME
  48. from llmops.agents.data_stardard import data_standardize
  49. from llmops.agents.report_agent import generate_report_section_content
  50. class CompleteAgentFlow:
  51. """完整的智能体工作流"""
  52. def __init__(self, api_key: str, base_url: str = "https://api.deepseek.com", model_name: str = "deepseek-chat"):
  53. """
  54. 初始化完整的工作流
  55. Args:
  56. api_key: DeepSeek API密钥
  57. base_url: DeepSeek API基础URL
  58. model_name: 模型名称
  59. """
  60. self.api_key = api_key
  61. self.base_url = base_url
  62. self.model_name = model_name
  63. # 初始规则引擎智能体
  64. self.rules_engine_agent = RulesEngineMetricCalculationAgent(api_key, base_url)
  65. # 异常识别智能体
  66. self.anomaly_recognizer = None
  67. # 创建工作流图
  68. self.workflow = self._create_workflow()
  69. def _create_workflow(self) -> StateGraph:
  70. """创建LangGraph工作流"""
  71. workflow = StateGraph(IntegratedWorkflowState)
  72. # 添加节点
  73. workflow.add_node("planning_node", self._planning_node)
  74. workflow.add_node("outline_generator", self._outline_generator_node)
  75. workflow.add_node("metric_calculator", self._metric_calculator_node)
  76. workflow.add_node("data_classify", self._data_classify_node)
  77. workflow.add_node("data_standardize", self._data_standardize_node)
  78. workflow.add_node("anomaly_recognition", self._anomaly_recognition_node)
  79. workflow.add_node("report_generator", self._report_generator_node)
  80. # 设置入口点
  81. workflow.set_entry_point("planning_node")
  82. # 添加条件边 - 基于规划决策路由
  83. workflow.add_conditional_edges(
  84. "planning_node",
  85. self._route_from_planning,
  86. {
  87. "outline_generator": "outline_generator",
  88. "metric_calculator": "metric_calculator",
  89. "data_classify": "data_classify",
  90. "data_standardize": "data_standardize",
  91. "anomaly_recognition": "anomaly_recognition",
  92. "report_generator": "report_generator",
  93. END: END
  94. }
  95. )
  96. # 从各个节点返回规划节点重新决策
  97. workflow.add_edge("data_standardize", "planning_node")
  98. workflow.add_edge("data_classify", "planning_node")
  99. workflow.add_edge("outline_generator", "planning_node")
  100. workflow.add_edge("metric_calculator", "planning_node")
  101. workflow.add_edge("anomaly_recognition", "planning_node")
  102. workflow.add_edge("report_generator", END)
  103. return workflow
  104. def _route_from_planning(self, state: IntegratedWorkflowState) -> str:
  105. """
  106. 从规划节点路由到下一个节点
  107. Args:
  108. state: 当前状态
  109. Returns:
  110. 目标节点名称
  111. """
  112. print(f"\n🔍 [路由决策] 步骤={state['planning_step']}, "
  113. f"数据集分类打标数量={len(state.get('data_set_classified', []))}",
  114. f"大纲={state.get('outline_draft') is not None}, "
  115. f"指标需求={len(state.get('metrics_requirements', []))}")
  116. # 防止无限循环
  117. if state['planning_step'] > 30:
  118. print("⚠️ 规划步骤超过30次,强制结束流程")
  119. return END
  120. # 数据标准化状态 0 → 数据标准化
  121. if state.get("is_standardized", 0) == 0:
  122. print("→ 路由到 data_standardize(数据标准化)")
  123. return "data_standardize"
  124. # 数据分类打标数量为0 → 分类打标
  125. if len(state.get("data_set_classified", [])) == 0:
  126. print("→ 路由到 data_classify(分类打标)")
  127. return "data_classify"
  128. # 异常识别未完成且有标准化数据 → 异常识别
  129. if not state.get("anomaly_recognition_completed", False) and state.get("standardized_file_path"):
  130. print("→ 路由到 anomaly_recognition(异常识别)")
  131. return "anomaly_recognition"
  132. # 如果大纲为空 → 生成大纲
  133. if not state.get("outline_draft"):
  134. print("→ 路由到 outline_generator(生成大纲)")
  135. return "outline_generator"
  136. # 如果指标需求为空但大纲已生成 → 评估指标需求
  137. if not state.get("metrics_requirements") and state.get("outline_draft"):
  138. print("→ 路由到 metric_evaluator(评估指标需求)")
  139. return "metric_evaluator"
  140. # 计算覆盖率
  141. progress = get_calculation_progress(state)
  142. coverage = progress["coverage_rate"]
  143. print(f" 指标覆盖率 = {coverage:.2%}")
  144. # 如果有待计算指标且覆盖率 < 100% → 计算指标
  145. if state.get("pending_metric_ids") and coverage < 1.0:
  146. print(f"→ 路由到 metric_calculator(计算指标,覆盖率={coverage:.2%})")
  147. return "metric_calculator"
  148. # 检查是否应该结束流程
  149. pending_ids = state.get("pending_metric_ids", [])
  150. failed_attempts = state.get("failed_metric_attempts", {})
  151. max_retries = 3
  152. # 计算还有哪些指标可以重试(未达到最大重试次数)
  153. retryable_metrics = [
  154. mid for mid in pending_ids
  155. if failed_attempts.get(mid, 0) < max_retries
  156. ]
  157. # 如果覆盖率 >= 80%,或者没有可重试的指标 → 生成报告
  158. if coverage >= 0.8 or not retryable_metrics:
  159. reason = "覆盖率达到80%" if coverage >= 0.8 else "没有可重试指标"
  160. print(f"→ 指标计算完成,进入生成报告(覆盖率={coverage:.2%},原因:{reason})")
  161. return "report_generator"
  162. # 默认返回规划节点
  163. return "planning_node"
  164. async def _planning_node(self, state: IntegratedWorkflowState) -> IntegratedWorkflowState:
  165. """规划节点:分析状态并做出决策"""
  166. try:
  167. print("🧠 正在执行规划分析...")
  168. # 使用规划智能体做出决策
  169. decision = await plan_next_action(
  170. question=state["question"],
  171. industry=state["industry"],
  172. current_state=state,
  173. api_key=self.api_key,
  174. base_url=self.base_url,
  175. model_name=self.model_name
  176. )
  177. # 更新状态
  178. new_state = update_state_with_planning_decision(state, {
  179. "decision": decision.decision,
  180. "next_route": self._decision_to_route(decision.decision),
  181. "metrics_to_compute": decision.metrics_to_compute
  182. })
  183. # 添加决策消息
  184. decision_msg = self._format_decision_message(decision)
  185. new_state["messages"].append({
  186. "role": "assistant",
  187. "content": decision_msg,
  188. "timestamp": datetime.now().isoformat()
  189. })
  190. print(f"✅ 规划决策完成:{decision.decision}")
  191. return convert_numpy_types(new_state)
  192. except Exception as e:
  193. print(f"❌ 规划节点执行失败: {e}")
  194. new_state = state.copy()
  195. new_state["errors"].append(f"规划节点错误: {str(e)}")
  196. return convert_numpy_types(new_state)
  197. async def _outline_generator_node(self, state: IntegratedWorkflowState) -> IntegratedWorkflowState:
  198. """大纲生成节点"""
  199. try:
  200. print("📝 正在生成报告大纲...")
  201. # 生成大纲(支持重试机制)
  202. outline = await generate_report_outline(
  203. question=state["question"],
  204. industry=state["industry"],
  205. sample_data=state["data_set"][:3], # 使用前3个样本
  206. api_key=self.api_key,
  207. base_url=self.base_url,
  208. model_name=self.model_name,
  209. max_retries=3, # 最多重试5次
  210. retry_delay=3.0 # 每次重试间隔3秒
  211. )
  212. # 更新状态
  213. new_state = update_state_with_outline_generation(state, outline)
  214. print(f"✅ 大纲生成完成:{outline.report_title}")
  215. print(f" 包含 {len(outline.sections)} 个章节,{len(outline.global_metrics)} 个指标需求")
  216. # 分析并打印AI的指标选择推理过程
  217. self._print_ai_selection_analysis(outline)
  218. return convert_numpy_types(new_state)
  219. except Exception as e:
  220. print(f"❌ 大纲生成失败: {e}")
  221. new_state = state.copy()
  222. new_state["errors"].append(f"大纲生成错误: {str(e)}")
  223. return convert_numpy_types(new_state)
  224. async def _data_classify_node(self, state: IntegratedWorkflowState) -> IntegratedWorkflowState:
  225. """数据分类打标节点"""
  226. try:
  227. standardized_file_path = state["standardized_file_path"]
  228. file_name = os.path.basename(standardized_file_path)
  229. # 读取标准化后的数据文件
  230. data_set = DataManager.load_data_from_csv_file(standardized_file_path)
  231. # 加载测试数据集并展示两条样例
  232. print(f"📊 读取标准化数据文件: {file_name}, 加载 {len(data_set)} 条记录")
  233. print(f"测试数据样例: {data_set[0:1]}")
  234. print("📝 正在对数据进行分类打标...")
  235. # 对数据进行分类打标
  236. data_set_classified = await data_classify(
  237. industry=state["industry"],
  238. data_set=data_set,
  239. file_name=state["file_name"]
  240. )
  241. # 更新状态
  242. new_state = update_state_with_data_classified(state, data_set_classified)
  243. print(f"✅ 数据分类打标完成,打标记录数: {len(data_set_classified)}")
  244. return convert_numpy_types(new_state)
  245. except Exception as e:
  246. print(f"❌ 数据分类打标失败: {e}")
  247. new_state = state.copy()
  248. new_state["errors"].append(f"数据分类打标错误: {str(e)}")
  249. return convert_numpy_types(new_state)
  250. async def _data_standardize_node(self, state: IntegratedWorkflowState) -> IntegratedWorkflowState:
  251. """数据标准化节点"""
  252. try:
  253. print("📝 正在对数据进行标准化处理...")
  254. # 数据标准化处理
  255. result = await data_standardize(
  256. api_key=self.api_key,
  257. base_url=self.base_url,
  258. model_name=self.model_name,
  259. multimodal_api_url=multimodal_api_url,
  260. input_file_path=state["original_file_path"]
  261. )
  262. is_succ = 0
  263. standardized_file_path = None
  264. if result["status"] == "success": # 数据标准化成功
  265. is_succ = 1
  266. standardized_file_path = result["file_path"]
  267. # 更新状态
  268. new_state = update_state_with_data_standardize(state, is_succ, standardized_file_path)
  269. print(f"✅ 数据标准化完成,处理状态: {is_succ},标准化文件路径:{standardized_file_path}")
  270. return convert_numpy_types(new_state)
  271. except Exception as e:
  272. print(f"❌ 数据标准化失败: {e}")
  273. new_state = state.copy()
  274. new_state["errors"].append(f"数据标准化错误: {str(e)}")
  275. return convert_numpy_types(new_state)
  276. async def _report_generator_node(self, state: IntegratedWorkflowState) -> IntegratedWorkflowState:
  277. """报告完成节点:生成最终报告"""
  278. try:
  279. print("📋 正在生成最终报告...")
  280. # 获取大纲和计算结果
  281. outline = state.get("outline_draft")
  282. computed_metrics = state.get("computed_metrics", {})
  283. anomaly_summary = state.get("anomaly_summary", {})
  284. print(f"已经完成的计算指标:{computed_metrics}")
  285. print(f"异常识别结果:{anomaly_summary.get('total_anomalies', 0)}条异常")
  286. if not outline:
  287. raise ValueError("没有可用的报告大纲")
  288. # 生成最终报告
  289. final_report = {
  290. "title": outline.report_title,
  291. "generated_at": datetime.now().isoformat(),
  292. "summary": {
  293. "total_sections": len(outline.sections),
  294. "total_metrics_required": len(outline.global_metrics),
  295. "total_metrics_computed": len(computed_metrics),
  296. "planning_steps": state.get("planning_step", 0),
  297. "completion_rate": len(computed_metrics) / len(
  298. outline.global_metrics) if outline.global_metrics else 0
  299. },
  300. "sections": [],
  301. # "metrics_detail": {},
  302. "anomaly_analysis": anomaly_summary
  303. }
  304. chapter_num = 0
  305. total_sections = len(outline.sections)
  306. # 构建章节内容
  307. for section in outline.sections:
  308. section_content = {
  309. "section_id": section.section_id,
  310. "title": section.title,
  311. "description": section.description,
  312. "metrics": {}
  313. }
  314. # 添加该章节的指标数据
  315. for metric_id in section.metrics_needed:
  316. if metric_id in computed_metrics:
  317. section_content["metrics"][metric_id] = computed_metrics[metric_id]
  318. else:
  319. if not metric_id.startswith("metric-"):
  320. # 指标缺少metric前缀,进行补充
  321. section_content["metrics"][metric_id] = computed_metrics["metric-"+metric_id]
  322. else:
  323. section_content["metrics"][metric_id] = "数据缺失"
  324. chapter_num += 1
  325. # 生成章节内容
  326. chapter_content = await generate_report_section_content(api_key=self.api_key, base_url=self.base_url, model_name=self.model_name, section=section_content, chapter_num=chapter_num, total_sections=total_sections)
  327. print(f"生成章节内容:{chapter_content}")
  328. section_content["content"] = chapter_content
  329. final_report["sections"].append(section_content)
  330. # 添加详细的指标信息
  331. # for metric_req in outline.global_metrics:
  332. # metric_id = metric_req.metric_id
  333. # final_report["metrics_detail"][metric_id] = {
  334. # "name": metric_req.metric_name,
  335. # "logic": metric_req.calculation_logic,
  336. # "required_fields": metric_req.required_fields,
  337. # "computed": metric_id in computed_metrics,
  338. # "value": computed_metrics.get(metric_id, {}).get("value", "N/A")
  339. # }
  340. # 添加异常识别章节(如果存在异常)
  341. if anomaly_summary.get('total_anomalies', 0) > 0:
  342. anomaly_section = {
  343. "section_id": "anomaly_analysis",
  344. "title": "交易异常识别分析",
  345. "description": "基于交易流水数据识别的异常交易情况分析",
  346. "content": self._generate_anomaly_analysis_content(anomaly_summary),
  347. "metrics": {}
  348. }
  349. final_report["sections"].append(anomaly_section)
  350. # 更新状态
  351. new_state = update_state_with_report(state, final_report)
  352. # 添加完成消息
  353. new_state["messages"].append({
  354. "role": "assistant",
  355. "content": f"🎉 完整报告生成流程完成:{outline.report_title}",
  356. "timestamp": datetime.now().isoformat()
  357. })
  358. print(f"✅ 最终报告生成完成:{outline.report_title}")
  359. print(f" 章节数:{len(final_report['sections'])}")
  360. print(f" 计算指标:{len(computed_metrics)}/{len(outline.global_metrics)}")
  361. print(f" 识别异常:{anomaly_summary.get('total_anomalies', 0)}条")
  362. print(".2%")
  363. return convert_numpy_types(new_state)
  364. except Exception as e:
  365. print(f"❌ 报告完成失败: {e}")
  366. new_state = state.copy()
  367. new_state["errors"].append(f"报告完成错误: {str(e)}")
  368. return convert_numpy_types(new_state)
  369. def _generate_anomaly_analysis_content(self, anomaly_summary: Dict[str, Any]) -> str:
  370. """生成异常分析章节内容"""
  371. total_anomalies = anomaly_summary.get('total_anomalies', 0)
  372. anomaly_ratio = anomaly_summary.get('anomaly_ratio', '0%')
  373. content = f"""
  374. ## 交易异常识别分析
  375. ### 异常识别概况
  376. 本次分析共识别出 **{total_anomalies}** 条异常交易记录,异常识别率为 **{anomaly_ratio}**。
  377. ### 异常类型分布
  378. """
  379. # 添加异常类型分布
  380. anomaly_distribution = anomaly_summary.get('anomaly_distribution', {})
  381. if anomaly_distribution:
  382. content += "\n| 异常类型 | 数量 | 占比 |\n|----------|------|------|\n"
  383. for anomaly_type, count in anomaly_distribution.items():
  384. percentage = (count / total_anomalies * 100) if total_anomalies > 0 else 0
  385. content += f"| {anomaly_type} | {count} | {percentage:.1f}% |\n"
  386. # 添加严重程度分布
  387. severity_distribution = anomaly_summary.get('severity_distribution', {})
  388. if severity_distribution:
  389. content += "\n### 严重程度分布\n"
  390. for severity, count in severity_distribution.items():
  391. percentage = (count / total_anomalies * 100) if total_anomalies > 0 else 0
  392. content += f"- **{severity.upper()}** 级别:{count} 条 ({percentage:.1f}%)\n"
  393. content += """
  394. ### 分析建议
  395. 1. 建议对高风险异常进行重点核查
  396. 2. 结合业务背景判断异常交易的真实性
  397. 3. 建立异常交易监控机制
  398. """
  399. return content
  400. def _print_ai_selection_analysis(self, outline):
  401. """打印AI指标选择的推理过程分析 - 完全通用版本"""
  402. print()
  403. print('╔══════════════════════════════════════════════════════════════════════════════╗')
  404. print('║ 🤖 AI指标选择分析 ║')
  405. print('╚══════════════════════════════════════════════════════════════════════════════╝')
  406. print()
  407. # 计算总指标数 - outline可能是字典格式,需要适配
  408. if hasattr(outline, 'sections'):
  409. # Pydantic模型格式
  410. total_metrics = sum(len(section.metrics_needed) for section in outline.sections)
  411. sections = outline.sections
  412. else:
  413. # 字典格式
  414. total_metrics = sum(len(section.get('metrics_needed', [])) for section in outline.get('sections', []))
  415. sections = outline.get('sections', [])
  416. # 获取可用指标总数(这里可以从状态或其他地方动态获取)
  417. available_count = 26 # 这个可以从API调用中动态获取
  418. print('📊 选择统计:')
  419. print(' ┌─────────────────────────────────────────────────────────────────────┐')
  420. print(' │ 系统可用指标: {}个 │ AI本次选择: {}个 │ 选择率: {:.1f}% │'.format(
  421. available_count, total_metrics, total_metrics/available_count*100 if available_count > 0 else 0))
  422. print(' └─────────────────────────────────────────────────────────────────────┘')
  423. print()
  424. print('📋 AI决策过程:')
  425. print(' 大模型已根据用户需求从{}个可用指标中选择了{}个最相关的指标。'.format(available_count, total_metrics))
  426. print(' 选择过程完全由大模型基于语义理解和业务逻辑进行,不涉及任何硬编码规则。')
  427. print()
  428. print('🔍 选择结果:')
  429. print(' • 总章节数: {}个'.format(len(sections)))
  430. print(' • 平均每章节指标数: {:.1f}个'.format(total_metrics/len(sections) if sections else 0))
  431. print(' • 选择策略: 基于用户需求的相关性分析')
  432. print()
  433. print('🎯 AI Agent核心能力:')
  434. print(' • 语义理解: 理解用户查询的业务意图和分析需求')
  435. print(' • 智能筛选: 从海量指标中挑选最相关的组合')
  436. print(' • 逻辑推理: 为每个分析维度提供充分的选择依据')
  437. print(' • 动态适配: 根据不同场景自动调整选择策略')
  438. print()
  439. print('💡 关键洞察:')
  440. print(' AI Agent通过大模型的推理能力,实现了超越传统规则引擎的智能化指标选择,')
  441. print(' 能够根据具体业务场景动态调整分析框架,确保分析的针对性和有效性。')
  442. print()
  443. async def _metric_calculator_node(self, state: IntegratedWorkflowState) -> IntegratedWorkflowState:
  444. """指标计算节点"""
  445. try:
  446. # 检查计算模式
  447. use_rules_engine_only = state.get("use_rules_engine_only", False)
  448. use_traditional_engine_only = state.get("use_traditional_engine_only", False)
  449. if use_rules_engine_only:
  450. print("🧮 正在执行规则引擎指标计算(专用模式)...")
  451. elif use_traditional_engine_only:
  452. print("🧮 正在执行传统引擎指标计算(专用模式)...")
  453. else:
  454. print("🧮 正在执行指标计算...")
  455. new_state = state.copy()
  456. # 使用规划决策指定的指标批次,如果没有指定则使用所有待计算指标
  457. current_batch = state.get("current_batch_metrics", [])
  458. if current_batch:
  459. pending_ids = current_batch
  460. print(f"🧮 本次计算批次包含 {len(pending_ids)} 个指标")
  461. else:
  462. pending_ids = state.get("pending_metric_ids", [])
  463. print(f"🧮 计算所有待计算指标,共 {len(pending_ids)} 个")
  464. if not pending_ids:
  465. print("⚠️ 没有待计算的指标")
  466. return convert_numpy_types(new_state)
  467. # 获取指标需求信息
  468. metrics_requirements = state.get("metrics_requirements", [])
  469. if not metrics_requirements:
  470. print("⚠️ 没有指标需求信息")
  471. return convert_numpy_types(new_state)
  472. # 计算成功和失败的指标
  473. successful_calculations = 0
  474. failed_calculations = 0
  475. # 遍历待计算的指标(创建副本避免修改时遍历的问题)
  476. for metric_id in pending_ids.copy():
  477. try:
  478. # 找到对应的指标需求
  479. metric_req = next((m for m in metrics_requirements if m.metric_id == metric_id), None)
  480. if not metric_req:
  481. # 修复:找不到指标需求时,创建临时的指标需求结构,避免跳过指标
  482. print(f"⚠️ 指标 {metric_id} 找不到需求信息,创建临时配置继续计算")
  483. metric_req = type('MetricRequirement', (), {
  484. 'metric_id': metric_id,
  485. 'metric_name': metric_id.replace('metric-', '') if metric_id.startswith('metric-') else metric_id,
  486. 'calculation_logic': f'计算 {metric_id}',
  487. 'required_fields': ['transactions'],
  488. 'dependencies': []
  489. })()
  490. print(f"🧮 计算指标: {metric_id} - {metric_req.metric_name}")
  491. # 根据模式决定使用哪种计算方式
  492. if use_rules_engine_only:
  493. # 只使用规则引擎计算
  494. use_rules_engine = True
  495. print(f" 使用规则引擎模式")
  496. elif use_traditional_engine_only:
  497. # 只使用传统引擎计算
  498. use_rules_engine = False
  499. print(f" 使用传统引擎模式")
  500. else:
  501. # 自动选择计算方式:优先使用规则引擎,只在规则引擎不可用时使用传统计算
  502. use_rules_engine = True # 默认使用规则引擎计算所有指标
  503. if use_rules_engine:
  504. # 使用规则引擎计算
  505. # 现在metric_id已经是知识ID,直接使用它作为配置名
  506. config_name = metric_id # metric_id 已经是知识ID,如 "metric-分析账户数量"
  507. intent_result = {
  508. "target_configs": [config_name],
  509. "intent_category": "指标计算"
  510. }
  511. print(f" 使用知识ID: {config_name}")
  512. # 将打好标的数据集传入指标计算函数中
  513. data_set_classified = state.get("data_set_classified", [])
  514. results = await self.rules_engine_agent.calculate_metrics(intent_result, data_set_classified)
  515. else:
  516. # 使用传统指标计算(模拟)
  517. # 这里简化处理,实际应该根据配置文件调用相应的API
  518. results = {
  519. "success": True,
  520. "results": [{
  521. "config_name": metric_req.metric_id,
  522. "result": {
  523. "success": True,
  524. "data": f"传统引擎计算结果:{metric_req.metric_name}",
  525. "value": 100.0 # 模拟数值
  526. }
  527. }]
  528. }
  529. # 处理计算结果
  530. calculation_success = False
  531. for result in results.get("results", []):
  532. if result.get("result", {}).get("success"):
  533. # 计算成功
  534. new_state["computed_metrics"][metric_id] = result["result"]
  535. successful_calculations += 1
  536. calculation_success = True
  537. print(f"✅ 指标 {metric_id} 计算成功")
  538. break # 找到一个成功的就算成功
  539. else:
  540. # 计算失败
  541. failed_calculations += 1
  542. print(f"❌ 指标 {metric_id} 计算失败")
  543. # 初始化失败尝试记录
  544. if "failed_metric_attempts" not in new_state:
  545. new_state["failed_metric_attempts"] = {}
  546. # 根据计算结果处理指标
  547. if calculation_success:
  548. # 计算成功:从待计算列表中移除
  549. if metric_id in new_state["pending_metric_ids"]:
  550. new_state["pending_metric_ids"].remove(metric_id)
  551. # 重置失败计数
  552. new_state["failed_metric_attempts"].pop(metric_id, None)
  553. else:
  554. # 计算失败:记录失败次数,不从待计算列表移除
  555. new_state["failed_metric_attempts"][metric_id] = new_state["failed_metric_attempts"].get(metric_id, 0) + 1
  556. max_retries = 3
  557. if new_state["failed_metric_attempts"][metric_id] >= max_retries:
  558. print(f"⚠️ 指标 {metric_id} 已达到最大重试次数 ({max_retries}),从待计算列表中移除")
  559. if metric_id in new_state["pending_metric_ids"]:
  560. new_state["pending_metric_ids"].remove(metric_id)
  561. except Exception as e:
  562. print(f"❌ 计算指标 {metric_id} 时发生异常: {e}")
  563. failed_calculations += 1
  564. # 初始化失败尝试记录
  565. if "failed_metric_attempts" not in new_state:
  566. new_state["failed_metric_attempts"] = {}
  567. # 记录失败次数
  568. new_state["failed_metric_attempts"][metric_id] = new_state["failed_metric_attempts"].get(metric_id, 0) + 1
  569. max_retries = 3
  570. if new_state["failed_metric_attempts"][metric_id] >= max_retries:
  571. print(f"⚠️ 指标 {metric_id} 异常已达到最大重试次数 ({max_retries}),从待计算列表中移除")
  572. if metric_id in new_state["pending_metric_ids"]:
  573. new_state["pending_metric_ids"].remove(metric_id)
  574. # 更新计算结果统计
  575. new_state["calculation_results"] = {
  576. "total_configs": len(pending_ids),
  577. "successful_calculations": successful_calculations,
  578. "failed_calculations": failed_calculations
  579. }
  580. # 添加消息
  581. if use_rules_engine_only:
  582. message_content = f"🧮 规则引擎指标计算完成:{successful_calculations} 成功,{failed_calculations} 失败"
  583. elif use_traditional_engine_only:
  584. message_content = f"🧮 传统引擎指标计算完成:{successful_calculations} 成功,{failed_calculations} 失败"
  585. else:
  586. message_content = f"🧮 指标计算完成:{successful_calculations} 成功,{failed_calculations} 失败"
  587. new_state["messages"].append({
  588. "role": "assistant",
  589. "content": message_content,
  590. "timestamp": datetime.now().isoformat()
  591. })
  592. if use_rules_engine_only:
  593. print(f"✅ 规则引擎指标计算完成:{successful_calculations} 成功,{failed_calculations} 失败")
  594. elif use_traditional_engine_only:
  595. print(f"✅ 传统引擎指标计算完成:{successful_calculations} 成功,{failed_calculations} 失败")
  596. else:
  597. print(f"✅ 指标计算完成:{successful_calculations} 成功,{failed_calculations} 失败")
  598. return convert_numpy_types(new_state)
  599. except Exception as e:
  600. print(f"❌ 指标计算节点失败: {e}")
  601. new_state = state.copy()
  602. new_state["errors"].append(f"指标计算错误: {str(e)}")
  603. return convert_numpy_types(new_state)
  604. async def _anomaly_recognition_node(self, state: IntegratedWorkflowState) -> IntegratedWorkflowState:
  605. """异常识别节点"""
  606. try:
  607. print("🔍 正在执行异常识别...")
  608. # 检查是否已初始化异常识别智能体
  609. if self.anomaly_recognizer is None:
  610. print("🤖 初始化异常识别智能体...")
  611. from llmops.agents.anomaly_recognizer_agent import AnomalyRecognitionAgent
  612. from llmops.config import anomaly_recognizer_config
  613. self.anomaly_recognizer = AnomalyRecognitionAgent(
  614. csv_path=state["standardized_file_path"],
  615. api_key=self.api_key,
  616. base_url=self.base_url,
  617. model_name=self.model_name,
  618. config=anomaly_recognizer_config
  619. )
  620. # 加载交易数据
  621. print("📥 加载交易数据...")
  622. transaction_data = self.anomaly_recognizer.load_transaction_data()
  623. # 执行异常识别
  624. print("🔍 执行异常识别分析...")
  625. recognition_results = self.anomaly_recognizer.execute_full_recognition()
  626. # 生成异常报告
  627. print("📊 生成异常识别报告...")
  628. output_dir = "outputs/anomaly_reports"
  629. report_path = self.anomaly_recognizer.generate_recognition_report(output_dir)
  630. # 使用 update_state_with_anomaly_recognition 函数更新状态
  631. new_state = update_state_with_anomaly_recognition(
  632. state,
  633. recognition_results,
  634. report_path
  635. )
  636. print(f"✅ 异常识别完成:发现 {recognition_results.get('summary', {}).get('total_identified_anomalies', 0)} 条异常")
  637. return convert_numpy_types(new_state)
  638. except Exception as e:
  639. print(f"❌ 异常识别失败: {e}")
  640. import traceback
  641. traceback.print_exc()
  642. # 即使失败也标记为完成,避免阻塞流程
  643. new_state = state.copy()
  644. new_state["anomaly_recognition_completed"] = True # 标记为完成,避免卡住
  645. new_state["errors"].append(f"异常识别错误: {str(e)}")
  646. # 使用默认的异常摘要
  647. new_state["anomaly_summary"] = {
  648. "total_anomalies": 0,
  649. "anomaly_ratio": "0%",
  650. "severity_distribution": {},
  651. "anomaly_distribution": {}
  652. }
  653. # 添加消息
  654. new_state["messages"].append({
  655. "role": "assistant",
  656. "content": f"⚠️ 异常识别失败,跳过异常分析: {str(e)}",
  657. "timestamp": datetime.now().isoformat()
  658. })
  659. return convert_numpy_types(new_state)
  660. def _decision_to_route(self, decision: str) -> str:
  661. """将规划决策转换为路由"""
  662. decision_routes = {
  663. "data_classify": "data_classify",
  664. "generate_outline": "outline_generator",
  665. "compute_metrics": "metric_calculator",
  666. "finalize_report": END # 直接结束流程
  667. }
  668. return decision_routes.get(decision, "planning_node")
  669. def _format_decision_message(self, decision: Any) -> str:
  670. """格式化决策消息"""
  671. try:
  672. decision_type = getattr(decision, 'decision', 'unknown')
  673. reasoning = getattr(decision, 'reasoning', '')
  674. if decision_type == "compute_metrics" and hasattr(decision, 'metrics_to_compute'):
  675. metrics = decision.metrics_to_compute
  676. return f"🧮 规划决策:计算 {len(metrics)} 个指标"
  677. elif decision_type == "finalize_report":
  678. return f"✅ 规划决策:生成最终报告"
  679. elif decision_type == "generate_outline":
  680. return f"📋 规划决策:生成大纲"
  681. else:
  682. return f"🤔 规划决策:{decision_type}"
  683. except:
  684. return "🤔 规划决策已完成"
  685. async def run_workflow(self, question: str, industry: str, original_file_path: str, session_id: str = None, use_rules_engine_only: bool = False, use_traditional_engine_only: bool = False) -> Dict[str, Any]:
  686. """
  687. 运行完整的工作流
  688. Args:
  689. question: 用户查询
  690. industry: 行业
  691. original_file_path: 原始文件路径
  692. session_id: 会话ID
  693. use_rules_engine_only: 是否只使用规则引擎指标计算
  694. use_traditional_engine_only: 是否只使用传统引擎指标计算
  695. Returns:
  696. 工作流结果
  697. """
  698. try:
  699. print("🚀 启动完整智能体工作流...")
  700. print(f"问题:{question}")
  701. print(f"行业:{industry}")
  702. print(f"数据文件:{original_file_path}")
  703. if use_rules_engine_only:
  704. print("计算模式:只使用规则引擎")
  705. elif use_traditional_engine_only:
  706. print("计算模式:只使用传统引擎")
  707. else:
  708. print("计算模式:标准模式")
  709. # 创建初始状态
  710. initial_state = create_initial_integrated_state(question, industry, original_file_path, session_id)
  711. # 设置计算模式标记
  712. if use_rules_engine_only:
  713. initial_state["use_rules_engine_only"] = True
  714. initial_state["use_traditional_engine_only"] = False
  715. elif use_traditional_engine_only:
  716. initial_state["use_rules_engine_only"] = False
  717. initial_state["use_traditional_engine_only"] = True
  718. else:
  719. initial_state["use_rules_engine_only"] = False
  720. initial_state["use_traditional_engine_only"] = False
  721. # 编译工作流
  722. app = self.workflow.compile()
  723. # 执行工作流
  724. result = await app.ainvoke(initial_state)
  725. print("✅ 工作流执行完成")
  726. return {
  727. "success": True,
  728. "result": result,
  729. "answer": result.get("answer"),
  730. "report": result.get("report_draft"),
  731. "session_id": result.get("session_id"),
  732. "execution_summary": {
  733. "planning_steps": result.get("planning_step", 0),
  734. "outline_generated": result.get("outline_draft") is not None,
  735. "metrics_computed": len(result.get("computed_metrics", {})),
  736. "completion_rate": result.get("completeness_score", 0)
  737. }
  738. }
  739. except Exception as e:
  740. print(f"❌ 工作流执行失败: {e}")
  741. return {
  742. "success": False,
  743. "error": str(e),
  744. "result": None
  745. }
  746. # 便捷函数
  747. async def run_complete_agent_flow(question: str, industry: str, data: List[Dict[str, Any]], file_name: str, api_key: str, session_id: str = None, use_rules_engine_only: bool = False, use_traditional_engine_only: bool = False) -> Dict[str, Any]:
  748. """
  749. 运行完整智能体工作流的便捷函数
  750. Args:
  751. question: 用户查询
  752. data: 数据集
  753. file_name: 数据文件名称
  754. api_key: API密钥
  755. session_id: 会话ID
  756. use_rules_engine_only: 是否只使用规则引擎指标计算
  757. use_traditional_engine_only: 是否只使用传统引擎指标计算
  758. Returns:
  759. 工作流结果
  760. """
  761. workflow = CompleteAgentFlow(api_key)
  762. return await workflow.run_workflow(question, industry, data, file_name, session_id, use_rules_engine_only, use_traditional_engine_only)
  763. # 便捷函数
  764. async def run_flow(question: str, industry: str, original_file_path: str, api_key: str, base_url: str, model_name: str, session_id: str = None, use_rules_engine_only: bool = False, use_traditional_engine_only: bool = False) -> Dict[str, Any]:
  765. """
  766. 运行完整智能体工作流的便捷函数
  767. Args:
  768. question: 用户查询
  769. data: 数据集
  770. original_file_path: 原始文件路径(pdf/img/csv)
  771. api_key: API密钥
  772. base_url: LLM base url
  773. model_name: LLM model name
  774. session_id: 会话ID
  775. use_rules_engine_only: 是否只使用规则引擎指标计算
  776. use_traditional_engine_only: 是否只使用传统引擎指标计算
  777. Returns:
  778. 工作流结果
  779. """
  780. workflow = CompleteAgentFlow(api_key, base_url, model_name)
  781. return await workflow.run_workflow(question, industry, original_file_path, session_id, use_rules_engine_only, use_traditional_engine_only)
  782. # 主函数用于测试
  783. async def main():
  784. """主函数:执行系统测试"""
  785. import os
  786. os.environ["LANGCHAIN_TRACING_V2"] = "false"
  787. os.environ["LANGCHAIN_API_KEY"] = ""
  788. # 禁用 LangGraph 的追踪
  789. os.environ["LANGSMITH_TRACING"] = "false"
  790. print("🚀 执行CompleteAgentFlow系统测试")
  791. print("=" * 50)
  792. # 行业
  793. industry = "农业"
  794. # 测试文件(pdf/img/csv)
  795. file_name = "11111.png"
  796. curr_dir = os.path.dirname(os.path.abspath(__file__))
  797. file_path = os.path.join(curr_dir, "..", "data_files", file_name)
  798. print(f"使用LLM:{LLM_MODEL_NAME}")
  799. # 执行测试
  800. result = await run_flow(
  801. question="请生成一份详细的农业经营贷流水分析报告,需要包含:1.总收入和总支出统计 2.收入笔数和支出笔数 3.各类型收入支出占比分析 4.交易对手收入支出TOP3排名 5.按月份的收入支出趋势分析 6.账户数量和交易时间范围统计 7.资金流入流出月度统计等全面指标",
  802. industry = industry,
  803. original_file_path=file_path,
  804. api_key=LLM_API_KEY,
  805. base_url=LLM_BASE_URL,
  806. model_name=LLM_MODEL_NAME,
  807. session_id="direct-test"
  808. )
  809. print(f"📋 结果: {'✅ 成功' if result.get('success') else '❌ 失败'}")
  810. print(f"{result}")
  811. if result.get('success'):
  812. summary = result.get('execution_summary', {})
  813. print(f" 规划步骤: {summary.get('planning_steps', 0)}")
  814. print(f" 指标计算: {summary.get('metrics_computed', 0)}")
  815. print("🎉 测试成功!")
  816. return result
  817. if __name__ == "__main__":
  818. import asyncio
  819. asyncio.run(main())