graph.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. from langgraph.graph import StateGraph, START, END
  2. from langgraph.checkpoint.memory import MemorySaver
  3. from llmops.agents.state import AgentState, convert_numpy_types
  4. from llmops.agents.planning_agent import planning_node
  5. from llmops.agents.outline_agent import outline_node
  6. from llmops.agents.metrics_agent import metrics_node
  7. def create_report_generation_graph():
  8. """构建报告生成图"""
  9. workflow = StateGraph(AgentState)
  10. # 添加节点
  11. workflow.add_node("planning_node", planning_node)
  12. workflow.add_node("outline_generator", outline_node)
  13. workflow.add_node("metrics_calculator", metrics_node)
  14. workflow.add_node("report_compiler", compile_final_report)
  15. workflow.add_node("clarify_node", handle_clarification)
  16. # 设置入口
  17. workflow.add_edge(START, "planning_node")
  18. # 条件边:根据规划节点返回的状态路由
  19. workflow.add_conditional_edges(
  20. "planning_node",
  21. route_from_planning,
  22. {
  23. "outline_generator": "outline_generator",
  24. "metrics_calculator": "metrics_calculator",
  25. "report_compiler": "report_compiler",
  26. "clarify_node": "clarify_node",
  27. "planning_node": "planning_node", # 继续循环
  28. "END": END
  29. }
  30. )
  31. # 返回规划节点重新决策
  32. workflow.add_edge("outline_generator", "planning_node")
  33. workflow.add_edge("metrics_calculator", "planning_node")
  34. workflow.add_edge("clarify_node", "planning_node")
  35. # 报告编译后结束
  36. workflow.add_edge("report_compiler", END)
  37. # 编译图
  38. return workflow.compile(
  39. checkpointer=MemorySaver(),
  40. interrupt_before=[],
  41. interrupt_after=[]
  42. )
  43. def route_from_planning(state: AgentState) -> str:
  44. """
  45. 从规划节点路由到下一个节点
  46. 返回目标节点名称
  47. """
  48. print(f"\n🔍 [路由决策] 步骤={state['planning_step']}, "
  49. f"大纲版本={state['outline_version']}, "
  50. f"大纲已生成={state.get('outline_draft') is not None}, "
  51. f"指标需求={len(state.get('metrics_requirements', []))}, "
  52. f"已计算={len(state.get('computed_metrics', {}))}")
  53. # 新增:防止无限循环
  54. if state['planning_step'] > 50:
  55. print("⚠️ 规划步骤超过50次,强制终止并生成报告")
  56. return "report_compiler"
  57. # 如果大纲为空 → 生成大纲
  58. if not state.get("outline_draft"):
  59. print("→ 路由到 outline_generator(大纲为空)")
  60. return "outline_generator"
  61. # 如果指标需求为空 → 重新生成大纲
  62. if not state.get("metrics_requirements"):
  63. print("→ 路由到 outline_generator(指标需求为空)")
  64. return "outline_generator"
  65. # 计算覆盖率
  66. required = len(state["metrics_requirements"])
  67. computed = len(state["computed_metrics"])
  68. coverage = computed / required if required > 0 else 0
  69. print(f" 指标覆盖率 = {computed}/{required} = {coverage:.2%}")
  70. # 新增:如果规划步骤过多且覆盖率超过50%,强制生成报告
  71. if state['planning_step'] > 30 and coverage > 0.5:
  72. print(f"→ 路由到 report_compiler(步骤过多,强制终止,覆盖率={coverage:.2%})")
  73. return "report_compiler"
  74. # 如果覆盖率 < 80% → 计算指标
  75. if coverage < 0.8:
  76. print(f"→ 路由到 metrics_calculator(覆盖率={coverage:.2%} < 80%)")
  77. return "metrics_calculator"
  78. # 如果覆盖率 ≥ 80% → 生成报告
  79. print(f"→ 路由到 report_compiler(覆盖率={coverage:.2%} ≥ 80%)")
  80. return "report_compiler"
  81. def compile_final_report(state: AgentState) -> AgentState:
  82. """报告编译节点:整合所有结果"""
  83. # 关键修复:将Pydantic模型转换为字典
  84. outline = state["outline_draft"]
  85. if hasattr(outline, 'dict'):
  86. outline_dict = outline.dict()
  87. else:
  88. outline_dict = outline
  89. metrics = state["computed_metrics"]
  90. # 按章节组织内容
  91. sections = []
  92. for section in outline_dict["sections"]:
  93. section_metrics = {
  94. mid: metrics.get(mid, "数据缺失")
  95. for mid in section["metrics_needed"]
  96. }
  97. sections.append({
  98. "title": section["title"],
  99. "description": section["description"],
  100. "metrics": section_metrics
  101. })
  102. final_report = {
  103. "title": outline_dict["report_title"],
  104. "sections": sections,
  105. "summary": {
  106. "total_metrics": len(metrics),
  107. "required_metrics": len(outline_dict["global_metrics"]),
  108. "coverage_rate": float(state["completeness_score"]),
  109. "planning_iterations": int(state["planning_step"])
  110. }
  111. }
  112. result_state = {
  113. **state,
  114. "answer": final_report,
  115. "status": "success",
  116. "messages": state["messages"] + [("ai", f"🎉 报告生成完成:{outline_dict['report_title']}")]
  117. }
  118. # 关键修复:返回前清理状态
  119. return convert_numpy_types(result_state)
  120. def handle_clarification(state: AgentState) -> AgentState:
  121. """澄清处理节点"""
  122. result_state = {
  123. **state,
  124. "status": "clarifying",
  125. "is_complete": True,
  126. "answer": "需要更多信息,请明确您的报告需求"
  127. }
  128. # 关键修复:返回前清理状态
  129. return convert_numpy_types(result_state)