workflow_state.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. """
  2. 整合的工作流状态定义
  3. ===================
  4. 此文件定义了整合了多个Agent的工作流状态,兼容现有的Big Agent状态管理和新增的报告生成Agent状态。
  5. 状态层次:
  6. 1. 输入层:用户查询和数据
  7. 2. 意图层:意图识别结果
  8. 3. 规划层:规划决策和大纲生成
  9. 4. 计算层:指标计算结果
  10. 5. 结果层:最终报告生成
  11. 6. 对话层:消息历史和错误处理
  12. 兼容性:
  13. - 兼容现有的Big Agent WorkflowState
  14. - 整合来自other_agents的AgentState
  15. - 支持扩展新的Agent状态需求
  16. 作者: Big Agent Team
  17. 版本: 1.0.0
  18. 创建时间: 2024-12-20
  19. """
  20. from typing import TypedDict, List, Dict, Any, Optional
  21. from datetime import datetime
  22. from langchain_core.messages import BaseMessage
  23. from pydantic import BaseModel, Field
  24. # ============= 数据模型 =============
  25. class MetricRequirement(BaseModel):
  26. """指标需求定义"""
  27. metric_id: str = Field(description="指标唯一标识,如 'total_income_jan'")
  28. metric_name: str = Field(description="指标中文名称")
  29. calculation_logic: str = Field(description="计算逻辑描述")
  30. required_fields: List[str] = Field(description="所需字段")
  31. dependencies: List[str] = Field(default_factory=list, description="依赖的其他指标ID")
  32. class ReportSection(BaseModel):
  33. """报告大纲章节"""
  34. section_id: str = Field(description="章节ID")
  35. title: str = Field(description="章节标题")
  36. description: str = Field(description="章节内容要求")
  37. metrics_needed: List[str] = Field(description="所需指标ID列表")
  38. class ReportOutline(BaseModel):
  39. """完整报告大纲"""
  40. report_title: str = Field(description="报告标题")
  41. sections: List[ReportSection] = Field(description="章节列表")
  42. global_metrics: List[MetricRequirement] = Field(description="全局指标列表")
  43. # ============= 序列化工具函数 =============
  44. def convert_numpy_types(obj: Any) -> Any:
  45. """
  46. 递归转换所有numpy类型为Python原生类型
  47. 确保所有数据可序列化
  48. """
  49. if isinstance(obj, dict):
  50. return {str(k): convert_numpy_types(v) for k, v in obj.items()}
  51. elif isinstance(obj, list):
  52. return [convert_numpy_types(item) for item in obj]
  53. elif isinstance(obj, tuple):
  54. return tuple(convert_numpy_types(item) for item in obj)
  55. elif isinstance(obj, set):
  56. return {convert_numpy_types(item) for item in obj}
  57. elif hasattr(obj, 'item') and hasattr(obj, 'dtype'): # numpy scalar
  58. return convert_numpy_types(obj.item())
  59. else:
  60. return obj
  61. # ============= 整合的工作流状态定义 =============
  62. class IntegratedWorkflowState(TypedDict):
  63. """整合的工作流状态定义 - 兼容多个Agent系统"""
  64. # === 基础输入层 (兼容Big Agent) ===
  65. user_input: str
  66. question: str # 别名,兼容报告生成Agent
  67. industry: str # 行业
  68. # === 数据层 ===
  69. data_set: List[Dict[str, Any]] # 报告生成Agent的数据格式
  70. transactions_df: Optional[Any] # 可选的数据框格式
  71. # === 意图识别层 (Big Agent原有) ===
  72. intent_result: Optional[Dict[str, Any]]
  73. # === 规划和大纲层 (新增) ===
  74. planning_step: int
  75. plan_history: List[str]
  76. outline_draft: Optional[ReportOutline]
  77. outline_version: int
  78. outline_ready: bool
  79. # === 指标计算层 ===
  80. metrics_requirements: List[MetricRequirement] # 报告生成Agent格式
  81. computed_metrics: Dict[str, Any] # 计算结果
  82. metrics_cache: Dict[str, Any] # 缓存
  83. pending_metric_ids: List[str] # 待计算指标ID
  84. failed_metric_attempts: Dict[str, int] # 失败统计
  85. calculation_results: Optional[Dict[str, Any]] # Big Agent格式的计算结果
  86. # === 结果层 ===
  87. report_draft: Dict[str, Any] # 报告草稿
  88. knowledge_result: Optional[Dict[str, Any]] # Big Agent知识沉淀结果
  89. is_complete: bool
  90. completeness_score: float
  91. answer: Optional[str] # 最终答案
  92. # === 对话和消息层 ===
  93. messages: List[Dict[str, Any]] # Big Agent消息格式
  94. current_node: str
  95. session_id: str
  96. next_route: str
  97. # === 错误处理层 ===
  98. errors: List[str]
  99. last_decision: str
  100. # === 时间跟踪层 ===
  101. start_time: str
  102. end_time: Optional[str]
  103. api_result: Dict[str, Any] # 存储所有API调用结果
  104. # ============= 状态创建和初始化函数 =============
  105. def create_initial_integrated_state(question: str, industry: str, data: List[Dict[str, Any]], session_id: str = None) -> IntegratedWorkflowState:
  106. """
  107. 创建初始的整合状态
  108. Args:
  109. question: 用户查询
  110. data: 数据集
  111. session_id: 会话ID
  112. Returns:
  113. 初始化后的状态
  114. """
  115. current_time = datetime.now().isoformat()
  116. session = session_id or f"session_{int(datetime.now().timestamp())}"
  117. return {
  118. # 基础输入
  119. "user_input": question,
  120. "question": question,
  121. "industry": industry,
  122. # 数据层
  123. "data_set": convert_numpy_types(data),
  124. # 规划和大纲层
  125. "planning_step": 0,
  126. "plan_history": [],
  127. "outline_draft": None,
  128. "outline_version": 0,
  129. "outline_ready": False,
  130. # 指标计算层
  131. "metrics_requirements": [],
  132. "computed_metrics": {},
  133. "metrics_cache": {},
  134. "pending_metric_ids": [],
  135. "failed_metric_attempts": {},
  136. "calculation_results": None,
  137. # 结果层
  138. "report_draft": {},
  139. "knowledge_result": None,
  140. "is_complete": False,
  141. "completeness_score": 0.0,
  142. "answer": None,
  143. # 对话和消息层
  144. "messages": [{
  145. "role": "user",
  146. "content": question,
  147. "timestamp": current_time
  148. }],
  149. "current_node": "start",
  150. "session_id": session,
  151. "next_route": "planning_node",
  152. # 错误处理层
  153. "errors": [],
  154. "last_decision": "init",
  155. # 时间跟踪层
  156. "start_time": current_time,
  157. "end_time": None,
  158. "api_result": {}, # 存储所有API调用结果
  159. # 计算模式配置层
  160. "use_rules_engine_only": False,
  161. "use_traditional_engine_only": False
  162. }
  163. def is_state_ready_for_calculation(state: IntegratedWorkflowState) -> bool:
  164. """
  165. 检查状态是否准备好进行指标计算
  166. Args:
  167. state: 当前状态
  168. Returns:
  169. 是否准备好
  170. """
  171. return (
  172. state.get("outline_draft") is not None and
  173. len(state.get("metrics_requirements", [])) > 0 and
  174. len(state.get("pending_metric_ids", [])) > 0
  175. )
  176. def get_calculation_progress(state: IntegratedWorkflowState) -> Dict[str, Any]:
  177. """
  178. 获取指标计算进度信息
  179. Args:
  180. state: 当前状态
  181. Returns:
  182. 进度信息
  183. """
  184. required = len(state.get("metrics_requirements", []))
  185. computed = len(state.get("computed_metrics", {}))
  186. pending = len(state.get("pending_metric_ids", []))
  187. return {
  188. "required_count": required,
  189. "computed_count": computed,
  190. "pending_count": pending,
  191. "coverage_rate": computed / required if required > 0 else 0,
  192. "is_complete": computed >= required * 0.8 # 80%覆盖率视为完成
  193. }
  194. def update_state_with_outline_generation(state: IntegratedWorkflowState, outline: ReportOutline) -> IntegratedWorkflowState:
  195. """
  196. 使用大纲生成结果更新状态
  197. Args:
  198. state: 当前状态
  199. outline: 生成的大纲
  200. Returns:
  201. 更新后的状态
  202. """
  203. new_state = state.copy()
  204. new_state["outline_draft"] = outline
  205. new_state["outline_version"] += 1
  206. new_state["outline_ready"] = True
  207. new_state["metrics_requirements"] = outline.global_metrics
  208. new_state["pending_metric_ids"] = [m.metric_id for m in outline.global_metrics]
  209. # 添加消息
  210. new_state["messages"].append({
  211. "role": "assistant",
  212. "content": f"✅ 大纲生成完成 v{new_state['outline_version']}:{outline.report_title}",
  213. "timestamp": datetime.now().isoformat()
  214. })
  215. return new_state
  216. def update_state_with_planning_decision(state: IntegratedWorkflowState, decision: Dict[str, Any]) -> IntegratedWorkflowState:
  217. """
  218. 使用规划决策结果更新状态
  219. Args:
  220. state: 当前状态
  221. decision: 规划决策
  222. Returns:
  223. 更新后的状态
  224. """
  225. new_state = state.copy()
  226. new_state["planning_step"] += 1
  227. new_state["last_decision"] = decision.get("decision", "unknown")
  228. new_state["next_route"] = decision.get("next_route", "planning_node")
  229. # 如果有待计算指标,更新待计算列表
  230. if decision.get("metrics_to_compute"):
  231. new_state["pending_metric_ids"] = decision["metrics_to_compute"]
  232. # 添加规划历史
  233. new_state["plan_history"].append(
  234. f"Step {new_state['planning_step']}: {decision.get('decision', 'unknown')}"
  235. )
  236. return new_state
  237. def finalize_state_with_report(state: IntegratedWorkflowState, final_report: Dict[str, Any]) -> IntegratedWorkflowState:
  238. """
  239. 使用最终报告完成状态
  240. Args:
  241. state: 当前状态
  242. final_report: 最终报告
  243. Returns:
  244. 完成的状态
  245. """
  246. new_state = state.copy()
  247. new_state["report_draft"] = final_report
  248. new_state["is_complete"] = True
  249. new_state["answer"] = final_report
  250. new_state["end_time"] = datetime.now().isoformat()
  251. # 计算完整性分数
  252. progress = get_calculation_progress(new_state)
  253. new_state["completeness_score"] = progress["coverage_rate"]
  254. return new_state