state_machine.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. #!/usr/bin/env python3
  2. """
  3. LangGraph状态机示例 - 学习工作流设计
  4. ==================================
  5. 这个文件展示了如何使用LangGraph创建状态机工作流,包含:
  6. 1. 状态定义
  7. 2. 节点函数
  8. 3. 条件路由
  9. 4. 工作流执行
  10. 运行方法:
  11. python examples/state_machine.py
  12. """
  13. import sys
  14. from typing import TypedDict, Annotated, List
  15. try:
  16. from langgraph.graph import StateGraph, START, END
  17. except ImportError as e:
  18. print(f"❌ 缺少依赖包: {e}")
  19. print("请运行: pip install langgraph")
  20. sys.exit(1)
  21. class AnalysisState(TypedDict):
  22. """分析工作流的状态"""
  23. question: str
  24. data: str
  25. current_step: str
  26. analysis_result: str
  27. report: str
  28. steps_completed: Annotated[List[str], "add"] # 使用注解支持列表追加
  29. def planning_node(state: AnalysisState) -> AnalysisState:
  30. """
  31. 规划节点 - 分析用户需求并制定计划
  32. 这是一个决策节点,决定下一步的分析方向
  33. """
  34. print("📋 规划阶段: 分析用户需求")
  35. question = state["question"].lower()
  36. # 简单的决策逻辑
  37. if "趋势" in question or "变化" in question:
  38. next_step = "trend_analysis"
  39. plan = "执行趋势分析"
  40. elif "对比" in question or "比较" in question:
  41. next_step = "comparison_analysis"
  42. plan = "执行对比分析"
  43. elif "汇总" in question or "总结" in question:
  44. next_step = "summary_analysis"
  45. plan = "执行汇总分析"
  46. else:
  47. next_step = "general_analysis"
  48. plan = "执行一般性分析"
  49. print(f" 决策: {plan}")
  50. return {
  51. **state,
  52. "current_step": next_step,
  53. "steps_completed": ["planning"]
  54. }
  55. def data_processing_node(state: AnalysisState) -> AnalysisState:
  56. """数据处理节点 - 准备数据用于分析"""
  57. print("🔧 数据处理阶段: 准备分析数据")
  58. # 模拟数据处理
  59. processed_data = f"已处理的数据: {state['data'][:50]}..."
  60. print(" 数据处理完成")
  61. return {
  62. **state,
  63. "data": processed_data,
  64. "steps_completed": ["data_processing"]
  65. }
  66. def trend_analysis_node(state: AnalysisState) -> AnalysisState:
  67. """趋势分析节点 - 专门处理趋势分析"""
  68. print("📈 趋势分析阶段: 识别数据变化趋势")
  69. # 模拟趋势分析
  70. result = f"趋势分析结果: 数据显示上升趋势,增长率为15%"
  71. print(f" 分析结果: {result}")
  72. return {
  73. **state,
  74. "analysis_result": result,
  75. "steps_completed": ["trend_analysis"]
  76. }
  77. def comparison_analysis_node(state: AnalysisState) -> AnalysisState:
  78. """对比分析节点 - 专门处理对比分析"""
  79. print("⚖️ 对比分析阶段: 比较不同数据集")
  80. # 模拟对比分析
  81. result = f"对比分析结果: A组数据优于B组数据,差异显著"
  82. print(f" 分析结果: {result}")
  83. return {
  84. **state,
  85. "analysis_result": result,
  86. "steps_completed": ["comparison_analysis"]
  87. }
  88. def general_analysis_node(state: AnalysisState) -> AnalysisState:
  89. """一般分析节点 - 处理通用分析需求"""
  90. print("🔍 一般分析阶段: 执行标准数据分析")
  91. # 模拟一般分析
  92. result = f"一般分析结果: 数据整体表现良好,符合预期"
  93. print(f" 分析结果: {result}")
  94. return {
  95. **state,
  96. "analysis_result": result,
  97. "steps_completed": ["general_analysis"]
  98. }
  99. def report_generation_node(state: AnalysisState) -> AnalysisState:
  100. """报告生成节点 - 生成最终分析报告"""
  101. print("📄 报告生成阶段: 整理分析结果")
  102. # 生成报告
  103. report = f"""
  104. 📊 数据分析报告
  105. ================
  106. 问题: {state['question']}
  107. 数据: {state['data']}
  108. 分析结果: {state['analysis_result']}
  109. 执行步骤: {', '.join(state['steps_completed'])}
  110. 结论: 分析完成,建议根据具体情况采取相应措施。
  111. """
  112. print(" 报告生成完成")
  113. return {
  114. **state,
  115. "report": report.strip(),
  116. "steps_completed": ["report_generation"]
  117. }
  118. def route_from_planning(state: AnalysisState) -> str:
  119. """
  120. 从规划节点路由到具体分析节点
  121. 这是一个条件路由函数,根据规划结果决定下一个节点
  122. """
  123. return state["current_step"]
  124. def create_analysis_workflow() -> StateGraph:
  125. """创建分析工作流"""
  126. # 创建状态图
  127. workflow = StateGraph(AnalysisState)
  128. # 添加节点
  129. workflow.add_node("planning", planning_node)
  130. workflow.add_node("data_processing", data_processing_node)
  131. workflow.add_node("trend_analysis", trend_analysis_node)
  132. workflow.add_node("comparison_analysis", comparison_analysis_node)
  133. workflow.add_node("general_analysis", general_analysis_node)
  134. workflow.add_node("report_generation", report_generation_node)
  135. # 设置入口点
  136. workflow.set_entry_point("planning")
  137. # 添加固定边
  138. workflow.add_edge("data_processing", "report_generation")
  139. workflow.add_edge("trend_analysis", "report_generation")
  140. workflow.add_edge("comparison_analysis", "report_generation")
  141. workflow.add_edge("general_analysis", "report_generation")
  142. workflow.add_edge("report_generation", END)
  143. # 添加条件边
  144. workflow.add_conditional_edges(
  145. "planning",
  146. route_from_planning,
  147. {
  148. "trend_analysis": "data_processing", # 趋势分析需要先处理数据
  149. "comparison_analysis": "data_processing", # 对比分析也需要先处理数据
  150. "general_analysis": "data_processing", # 一般分析也需要先处理数据
  151. }
  152. )
  153. return workflow
  154. def main():
  155. """主函数 - 演示状态机工作流"""
  156. print("🚀 LangGraph状态机示例")
  157. print("=" * 50)
  158. try:
  159. # 创建工作流
  160. workflow = create_analysis_workflow()
  161. app = workflow.compile()
  162. # 测试用例
  163. test_cases = [
  164. {
  165. "question": "这个季度的数据趋势如何?",
  166. "data": "Q1: 100, Q2: 120, Q3: 140, Q4: 160"
  167. },
  168. {
  169. "question": "比较A产品和B产品的销量",
  170. "data": "A产品: 500件, B产品: 450件"
  171. },
  172. {
  173. "question": "请分析这份数据",
  174. "data": "各类数据指标: 正常范围"
  175. }
  176. ]
  177. print("\n🧪 测试不同类型的分析:")
  178. print("-" * 40)
  179. for i, test_case in enumerate(test_cases, 1):
  180. print(f"\n📋 测试用例 {i}:")
  181. print(f"问题: {test_case['question']}")
  182. print(f"数据: {test_case['data']}")
  183. # 执行工作流
  184. result = app.invoke({
  185. "question": test_case["question"],
  186. "data": test_case["data"],
  187. "current_step": "",
  188. "analysis_result": "",
  189. "report": "",
  190. "steps_completed": []
  191. })
  192. print("\n✅ 执行完成:")
  193. print(f"执行步骤: {result['steps_completed']}")
  194. print(f"分析结果: {result['analysis_result']}")
  195. # 显示报告预览
  196. report_preview = result['report'][:200] + "..." if len(result['report']) > 200 else result['report']
  197. print(f"报告预览: {report_preview}")
  198. print("\n🎉 状态机示例完成!")
  199. print("\n💡 LangGraph学习要点:")
  200. print("1. 状态定义: 使用TypedDict定义工作流状态")
  201. print("2. 节点函数: 每个节点处理特定逻辑")
  202. print("3. 条件路由: 根据状态动态选择下一节点")
  203. print("4. 状态传递: 通过return更新状态")
  204. print("5. 工作流编译: 调用compile()创建可执行应用")
  205. print("\n📚 下一步学习:")
  206. print("- 查看 examples/advanced_agent.py - 学习高级Agent功能")
  207. print("- 阅读 llmops/ 目录下的实际Agent代码")
  208. print("- 学习 PRACTICE_GUIDE.md 中的Phase 3内容")
  209. except Exception as e:
  210. print(f"❌ 运行出错: {e}")
  211. print("\n🔧 故障排除:")
  212. print("1. 确认已安装langgraph: pip install langgraph")
  213. print("2. 检查Python版本是否支持(推荐3.10+)")
  214. if __name__ == "__main__":
  215. main()