workflow_state.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. """
  2. 整合的工作流状态定义
  3. ===================
  4. 此文件定义了整合了多个Agent的工作流状态,兼容现有的Big Agent状态管理和新增的报告生成Agent状态。
  5. 状态层次:
  6. 1. 输入层:用户查询和数据
  7. 2. 意图层:意图识别结果
  8. 3. 规划层:规划决策和大纲生成
  9. 4. 计算层:指标计算结果
  10. 5. 结果层:最终报告生成
  11. 6. 对话层:消息历史和错误处理
  12. 兼容性:
  13. - 兼容现有的Big Agent WorkflowState
  14. - 整合来自other_agents的AgentState
  15. - 支持扩展新的Agent状态需求
  16. 作者: Big Agent Team
  17. 版本: 1.0.0
  18. 创建时间: 2024-12-20
  19. """
  20. from typing import TypedDict, List, Dict, Any, Optional
  21. from datetime import datetime
  22. from langchain_core.messages import BaseMessage
  23. from pydantic import BaseModel, Field
  24. # ============= 数据模型 =============
  25. class MetricRequirement(BaseModel):
  26. """指标需求定义"""
  27. metric_id: str = Field(description="指标唯一标识,如 'total_income_jan'")
  28. metric_name: str = Field(description="指标中文名称")
  29. calculation_logic: str = Field(description="计算逻辑描述")
  30. required_fields: List[str] = Field(description="所需字段")
  31. dependencies: List[str] = Field(default_factory=list, description="依赖的其他指标ID")
  32. class ReportSection(BaseModel):
  33. """报告大纲章节"""
  34. section_id: str = Field(description="章节ID")
  35. title: str = Field(description="章节标题")
  36. description: str = Field(description="章节内容要求")
  37. metrics_needed: List[str] = Field(description="所需指标ID列表")
  38. class ReportOutline(BaseModel):
  39. """完整报告大纲"""
  40. report_title: str = Field(description="报告标题")
  41. sections: List[ReportSection] = Field(description="章节列表")
  42. global_metrics: List[MetricRequirement] = Field(description="全局指标列表")
  43. # ============= 序列化工具函数 =============
  44. def convert_numpy_types(obj: Any) -> Any:
  45. """
  46. 递归转换所有numpy类型为Python原生类型
  47. 确保所有数据可序列化
  48. """
  49. if isinstance(obj, dict):
  50. return {str(k): convert_numpy_types(v) for k, v in obj.items()}
  51. elif isinstance(obj, list):
  52. return [convert_numpy_types(item) for item in obj]
  53. elif isinstance(obj, tuple):
  54. return tuple(convert_numpy_types(item) for item in obj)
  55. elif isinstance(obj, set):
  56. return {convert_numpy_types(item) for item in obj}
  57. elif hasattr(obj, 'item') and hasattr(obj, 'dtype'): # numpy scalar
  58. return convert_numpy_types(obj.item())
  59. else:
  60. return obj
  61. # ============= 整合的工作流状态定义 =============
  62. class IntegratedWorkflowState(TypedDict):
  63. """整合的工作流状态定义 - 兼容多个Agent系统"""
  64. # === 基础输入层 (兼容Big Agent) ===
  65. user_input: str
  66. question: str # 别名,兼容报告生成Agent
  67. industry: str # 行业
  68. # === 数据层 ===
  69. data_set: List[Dict[str, Any]] # 报告生成Agent的数据格式
  70. transactions_df: Optional[Any] # 可选的数据框格式
  71. # === 意图识别层 (Big Agent原有) ===
  72. intent_result: Optional[Dict[str, Any]]
  73. # === 规划和大纲层 (新增) ===
  74. planning_step: int
  75. plan_history: List[str]
  76. outline_draft: Optional[ReportOutline]
  77. outline_version: int
  78. outline_ready: bool
  79. # === 指标计算层 ===
  80. metrics_requirements: List[MetricRequirement] # 报告生成Agent格式
  81. computed_metrics: Dict[str, Any] # 计算结果
  82. metrics_cache: Dict[str, Any] # 缓存
  83. pending_metric_ids: List[str] # 待计算指标ID
  84. failed_metric_attempts: Dict[str, int] # 失败统计
  85. calculation_results: Optional[Dict[str, Any]] # Big Agent格式的计算结果
  86. # === 结果层 ===
  87. report_draft: Dict[str, Any] # 报告草稿
  88. knowledge_result: Optional[Dict[str, Any]] # Big Agent知识沉淀结果
  89. is_complete: bool
  90. completeness_score: float
  91. answer: Optional[str] # 最终答案
  92. # === 对话和消息层 ===
  93. messages: List[Dict[str, Any]] # Big Agent消息格式
  94. current_node: str
  95. session_id: str
  96. next_route: str
  97. # === 错误处理层 ===
  98. errors: List[str]
  99. last_decision: str
  100. # === 时间跟踪层 ===
  101. start_time: str
  102. end_time: Optional[str]
  103. api_result: Dict[str, Any] # 存储所有API调用结果
  104. # ============= 状态创建和初始化函数 =============
  105. def create_initial_integrated_state(question: str, industry: str, data: List[Dict[str, Any]], session_id: str = None) -> IntegratedWorkflowState:
  106. """
  107. 创建初始的整合状态
  108. Args:
  109. question: 用户查询
  110. data: 数据集
  111. session_id: 会话ID
  112. Returns:
  113. 初始化后的状态
  114. """
  115. current_time = datetime.now().isoformat()
  116. session = session_id or f"session_{int(datetime.now().timestamp())}"
  117. return {
  118. # 基础输入
  119. "user_input": question,
  120. "question": question,
  121. "industry": industry,
  122. # 数据层
  123. "data_set": convert_numpy_types(data),
  124. "transactions_df": None,
  125. # 意图识别层
  126. "intent_result": None,
  127. # 规划和大纲层
  128. "planning_step": 0,
  129. "plan_history": [],
  130. "outline_draft": None,
  131. "outline_version": 0,
  132. "outline_ready": False,
  133. # 指标计算层
  134. "metrics_requirements": [],
  135. "computed_metrics": {},
  136. "metrics_cache": {},
  137. "pending_metric_ids": [],
  138. "failed_metric_attempts": {},
  139. "calculation_results": None,
  140. # 结果层
  141. "report_draft": {},
  142. "knowledge_result": None,
  143. "is_complete": False,
  144. "completeness_score": 0.0,
  145. "answer": None,
  146. # 对话和消息层
  147. "messages": [{
  148. "role": "user",
  149. "content": question,
  150. "timestamp": current_time
  151. }],
  152. "current_node": "start",
  153. "session_id": session,
  154. "next_route": "planning_node",
  155. # 错误处理层
  156. "errors": [],
  157. "last_decision": "init",
  158. # 时间跟踪层
  159. "start_time": current_time,
  160. "end_time": None,
  161. "api_result": {}, # 存储所有API调用结果
  162. # 计算模式配置层
  163. "use_rules_engine_only": False,
  164. "use_traditional_engine_only": False
  165. }
  166. def is_state_ready_for_calculation(state: IntegratedWorkflowState) -> bool:
  167. """
  168. 检查状态是否准备好进行指标计算
  169. Args:
  170. state: 当前状态
  171. Returns:
  172. 是否准备好
  173. """
  174. return (
  175. state.get("outline_draft") is not None and
  176. len(state.get("metrics_requirements", [])) > 0 and
  177. len(state.get("pending_metric_ids", [])) > 0
  178. )
  179. def get_calculation_progress(state: IntegratedWorkflowState) -> Dict[str, Any]:
  180. """
  181. 获取指标计算进度信息
  182. Args:
  183. state: 当前状态
  184. Returns:
  185. 进度信息
  186. """
  187. required = len(state.get("metrics_requirements", []))
  188. computed = len(state.get("computed_metrics", {}))
  189. pending = len(state.get("pending_metric_ids", []))
  190. return {
  191. "required_count": required,
  192. "computed_count": computed,
  193. "pending_count": pending,
  194. "coverage_rate": computed / required if required > 0 else 0,
  195. "is_complete": computed >= required * 0.8 # 80%覆盖率视为完成
  196. }
  197. def update_state_with_outline_generation(state: IntegratedWorkflowState, outline: ReportOutline) -> IntegratedWorkflowState:
  198. """
  199. 使用大纲生成结果更新状态
  200. Args:
  201. state: 当前状态
  202. outline: 生成的大纲
  203. Returns:
  204. 更新后的状态
  205. """
  206. new_state = state.copy()
  207. new_state["outline_draft"] = outline
  208. new_state["outline_version"] += 1
  209. new_state["outline_ready"] = True
  210. new_state["metrics_requirements"] = outline.global_metrics
  211. new_state["pending_metric_ids"] = [m.metric_id for m in outline.global_metrics]
  212. # 添加消息
  213. new_state["messages"].append({
  214. "role": "assistant",
  215. "content": f"✅ 大纲生成完成 v{new_state['outline_version']}:{outline.report_title}",
  216. "timestamp": datetime.now().isoformat()
  217. })
  218. return new_state
  219. def update_state_with_planning_decision(state: IntegratedWorkflowState, decision: Dict[str, Any]) -> IntegratedWorkflowState:
  220. """
  221. 使用规划决策结果更新状态
  222. Args:
  223. state: 当前状态
  224. decision: 规划决策
  225. Returns:
  226. 更新后的状态
  227. """
  228. new_state = state.copy()
  229. new_state["planning_step"] += 1
  230. new_state["last_decision"] = decision.get("decision", "unknown")
  231. new_state["next_route"] = decision.get("next_route", "planning_node")
  232. # 如果有待计算指标,更新待计算列表
  233. if decision.get("metrics_to_compute"):
  234. new_state["pending_metric_ids"] = decision["metrics_to_compute"]
  235. # 添加规划历史
  236. new_state["plan_history"].append(
  237. f"Step {new_state['planning_step']}: {decision.get('decision', 'unknown')}"
  238. )
  239. return new_state
  240. def finalize_state_with_report(state: IntegratedWorkflowState, final_report: Dict[str, Any]) -> IntegratedWorkflowState:
  241. """
  242. 使用最终报告完成状态
  243. Args:
  244. state: 当前状态
  245. final_report: 最终报告
  246. Returns:
  247. 完成的状态
  248. """
  249. new_state = state.copy()
  250. new_state["report_draft"] = final_report
  251. new_state["is_complete"] = True
  252. new_state["answer"] = final_report
  253. new_state["end_time"] = datetime.now().isoformat()
  254. # 计算完整性分数
  255. progress = get_calculation_progress(new_state)
  256. new_state["completeness_score"] = progress["coverage_rate"]
  257. return new_state