| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328 |
- """
- 整合的工作流状态定义
- ===================
- 此文件定义了整合了多个Agent的工作流状态,兼容现有的Big Agent状态管理和新增的报告生成Agent状态。
- 状态层次:
- 1. 输入层:用户查询和数据
- 2. 意图层:意图识别结果
- 3. 规划层:规划决策和大纲生成
- 4. 计算层:指标计算结果
- 5. 结果层:最终报告生成
- 6. 对话层:消息历史和错误处理
- 兼容性:
- - 兼容现有的Big Agent WorkflowState
- - 整合来自other_agents的AgentState
- - 支持扩展新的Agent状态需求
- 作者: Big Agent Team
- 版本: 1.0.0
- 创建时间: 2024-12-20
- """
- from typing import TypedDict, List, Dict, Any, Optional
- from datetime import datetime
- from langchain_core.messages import BaseMessage
- from pydantic import BaseModel, Field
- # ============= 数据模型 =============
- class MetricRequirement(BaseModel):
- """指标需求定义"""
- metric_id: str = Field(description="指标唯一标识,如 'total_income_jan'")
- metric_name: str = Field(description="指标中文名称")
- calculation_logic: str = Field(description="计算逻辑描述")
- required_fields: List[str] = Field(description="所需字段")
- dependencies: List[str] = Field(default_factory=list, description="依赖的其他指标ID")
- class ReportSection(BaseModel):
- """报告大纲章节"""
- section_id: str = Field(description="章节ID")
- title: str = Field(description="章节标题")
- description: str = Field(description="章节内容要求")
- metrics_needed: List[str] = Field(description="所需指标ID列表")
- class ReportOutline(BaseModel):
- """完整报告大纲"""
- report_title: str = Field(description="报告标题")
- sections: List[ReportSection] = Field(description="章节列表")
- global_metrics: List[MetricRequirement] = Field(description="全局指标列表")
- # ============= 序列化工具函数 =============
- def convert_numpy_types(obj: Any) -> Any:
- """
- 递归转换所有numpy类型为Python原生类型
- 确保所有数据可序列化
- """
- if isinstance(obj, dict):
- return {str(k): convert_numpy_types(v) for k, v in obj.items()}
- elif isinstance(obj, list):
- return [convert_numpy_types(item) for item in obj]
- elif isinstance(obj, tuple):
- return tuple(convert_numpy_types(item) for item in obj)
- elif isinstance(obj, set):
- return {convert_numpy_types(item) for item in obj}
- elif hasattr(obj, 'item') and hasattr(obj, 'dtype'): # numpy scalar
- return convert_numpy_types(obj.item())
- else:
- return obj
- # ============= 整合的工作流状态定义 =============
- class IntegratedWorkflowState(TypedDict):
- """整合的工作流状态定义 - 兼容多个Agent系统"""
- # === 基础输入层 (兼容Big Agent) ===
- user_input: str
- question: str # 别名,兼容报告生成Agent
- industry: str # 行业
- # === 数据层 ===
- data_set: List[Dict[str, Any]] # 报告生成Agent的数据格式
- transactions_df: Optional[Any] # 可选的数据框格式
- # === 意图识别层 (Big Agent原有) ===
- intent_result: Optional[Dict[str, Any]]
- # === 规划和大纲层 (新增) ===
- planning_step: int
- plan_history: List[str]
- outline_draft: Optional[ReportOutline]
- outline_version: int
- outline_ready: bool
- # === 指标计算层 ===
- metrics_requirements: List[MetricRequirement] # 报告生成Agent格式
- computed_metrics: Dict[str, Any] # 计算结果
- metrics_cache: Dict[str, Any] # 缓存
- pending_metric_ids: List[str] # 待计算指标ID
- failed_metric_attempts: Dict[str, int] # 失败统计
- calculation_results: Optional[Dict[str, Any]] # Big Agent格式的计算结果
- # === 结果层 ===
- report_draft: Dict[str, Any] # 报告草稿
- knowledge_result: Optional[Dict[str, Any]] # Big Agent知识沉淀结果
- is_complete: bool
- completeness_score: float
- answer: Optional[str] # 最终答案
- # === 对话和消息层 ===
- messages: List[Dict[str, Any]] # Big Agent消息格式
- current_node: str
- session_id: str
- next_route: str
- # === 错误处理层 ===
- errors: List[str]
- last_decision: str
- # === 时间跟踪层 ===
- start_time: str
- end_time: Optional[str]
- api_result: Dict[str, Any] # 存储所有API调用结果
- # ============= 状态创建和初始化函数 =============
- def create_initial_integrated_state(question: str, industry: str, data: List[Dict[str, Any]], session_id: str = None) -> IntegratedWorkflowState:
- """
- 创建初始的整合状态
- Args:
- question: 用户查询
- data: 数据集
- session_id: 会话ID
- Returns:
- 初始化后的状态
- """
- current_time = datetime.now().isoformat()
- session = session_id or f"session_{int(datetime.now().timestamp())}"
- return {
- # 基础输入
- "user_input": question,
- "question": question,
- "industry": industry,
- # 数据层
- "data_set": convert_numpy_types(data),
- "transactions_df": None,
- # 意图识别层
- "intent_result": None,
- # 规划和大纲层
- "planning_step": 0,
- "plan_history": [],
- "outline_draft": None,
- "outline_version": 0,
- "outline_ready": False,
- # 指标计算层
- "metrics_requirements": [],
- "computed_metrics": {},
- "metrics_cache": {},
- "pending_metric_ids": [],
- "failed_metric_attempts": {},
- "calculation_results": None,
- # 结果层
- "report_draft": {},
- "knowledge_result": None,
- "is_complete": False,
- "completeness_score": 0.0,
- "answer": None,
- # 对话和消息层
- "messages": [{
- "role": "user",
- "content": question,
- "timestamp": current_time
- }],
- "current_node": "start",
- "session_id": session,
- "next_route": "planning_node",
- # 错误处理层
- "errors": [],
- "last_decision": "init",
- # 时间跟踪层
- "start_time": current_time,
- "end_time": None,
- "api_result": {}, # 存储所有API调用结果
- # 计算模式配置层
- "use_rules_engine_only": False,
- "use_traditional_engine_only": False
- }
- def is_state_ready_for_calculation(state: IntegratedWorkflowState) -> bool:
- """
- 检查状态是否准备好进行指标计算
- Args:
- state: 当前状态
- Returns:
- 是否准备好
- """
- return (
- state.get("outline_draft") is not None and
- len(state.get("metrics_requirements", [])) > 0 and
- len(state.get("pending_metric_ids", [])) > 0
- )
- def get_calculation_progress(state: IntegratedWorkflowState) -> Dict[str, Any]:
- """
- 获取指标计算进度信息
- Args:
- state: 当前状态
- Returns:
- 进度信息
- """
- required = len(state.get("metrics_requirements", []))
- computed = len(state.get("computed_metrics", {}))
- pending = len(state.get("pending_metric_ids", []))
- return {
- "required_count": required,
- "computed_count": computed,
- "pending_count": pending,
- "coverage_rate": computed / required if required > 0 else 0,
- "is_complete": computed >= required * 0.8 # 80%覆盖率视为完成
- }
- def update_state_with_outline_generation(state: IntegratedWorkflowState, outline: ReportOutline) -> IntegratedWorkflowState:
- """
- 使用大纲生成结果更新状态
- Args:
- state: 当前状态
- outline: 生成的大纲
- Returns:
- 更新后的状态
- """
- new_state = state.copy()
- new_state["outline_draft"] = outline
- new_state["outline_version"] += 1
- new_state["outline_ready"] = True
- new_state["metrics_requirements"] = outline.global_metrics
- new_state["pending_metric_ids"] = [m.metric_id for m in outline.global_metrics]
- # 添加消息
- new_state["messages"].append({
- "role": "assistant",
- "content": f"✅ 大纲生成完成 v{new_state['outline_version']}:{outline.report_title}",
- "timestamp": datetime.now().isoformat()
- })
- return new_state
- def update_state_with_planning_decision(state: IntegratedWorkflowState, decision: Dict[str, Any]) -> IntegratedWorkflowState:
- """
- 使用规划决策结果更新状态
- Args:
- state: 当前状态
- decision: 规划决策
- Returns:
- 更新后的状态
- """
- new_state = state.copy()
- new_state["planning_step"] += 1
- new_state["last_decision"] = decision.get("decision", "unknown")
- new_state["next_route"] = decision.get("next_route", "planning_node")
- # 如果有待计算指标,更新待计算列表
- if decision.get("metrics_to_compute"):
- new_state["pending_metric_ids"] = decision["metrics_to_compute"]
- # 添加规划历史
- new_state["plan_history"].append(
- f"Step {new_state['planning_step']}: {decision.get('decision', 'unknown')}"
- )
- return new_state
- def finalize_state_with_report(state: IntegratedWorkflowState, final_report: Dict[str, Any]) -> IntegratedWorkflowState:
- """
- 使用最终报告完成状态
- Args:
- state: 当前状态
- final_report: 最终报告
- Returns:
- 完成的状态
- """
- new_state = state.copy()
- new_state["report_draft"] = final_report
- new_state["is_complete"] = True
- new_state["answer"] = final_report
- new_state["end_time"] = datetime.now().isoformat()
- # 计算完整性分数
- progress = get_calculation_progress(new_state)
- new_state["completeness_score"] = progress["coverage_rate"]
- return new_state
|