JiaQiang 1 неделя назад
Родитель
Сommit
b58d5ebd8f
1 измененных файлов с 328 добавлено и 0 удалено
  1. 328 0
      llmops/workflow_state.py

+ 328 - 0
llmops/workflow_state.py

@@ -0,0 +1,328 @@
+"""
+整合的工作流状态定义
+===================
+
+此文件定义了整合了多个Agent的工作流状态,兼容现有的Big Agent状态管理和新增的报告生成Agent状态。
+
+状态层次:
+1. 输入层:用户查询和数据
+2. 意图层:意图识别结果
+3. 规划层:规划决策和大纲生成
+4. 计算层:指标计算结果
+5. 结果层:最终报告生成
+6. 对话层:消息历史和错误处理
+
+兼容性:
+- 兼容现有的Big Agent WorkflowState
+- 整合来自other_agents的AgentState
+- 支持扩展新的Agent状态需求
+
+作者: Big Agent Team
+版本: 1.0.0
+创建时间: 2024-12-20
+"""
+
+from typing import TypedDict, List, Dict, Any, Optional
+from datetime import datetime
+from langchain_core.messages import BaseMessage
+from pydantic import BaseModel, Field
+
+
+# ============= 数据模型 =============
+
+class MetricRequirement(BaseModel):
+    """指标需求定义"""
+    metric_id: str = Field(description="指标唯一标识,如 'total_income_jan'")
+    metric_name: str = Field(description="指标中文名称")
+    calculation_logic: str = Field(description="计算逻辑描述")
+    required_fields: List[str] = Field(description="所需字段")
+    dependencies: List[str] = Field(default_factory=list, description="依赖的其他指标ID")
+
+
+class ReportSection(BaseModel):
+    """报告大纲章节"""
+    section_id: str = Field(description="章节ID")
+    title: str = Field(description="章节标题")
+    description: str = Field(description="章节内容要求")
+    metrics_needed: List[str] = Field(description="所需指标ID列表")
+
+
+class ReportOutline(BaseModel):
+    """完整报告大纲"""
+    report_title: str = Field(description="报告标题")
+    sections: List[ReportSection] = Field(description="章节列表")
+    global_metrics: List[MetricRequirement] = Field(description="全局指标列表")
+
+
+# ============= 序列化工具函数 =============
+
+def convert_numpy_types(obj: Any) -> Any:
+    """
+    递归转换所有numpy类型为Python原生类型
+    确保所有数据可序列化
+    """
+    if isinstance(obj, dict):
+        return {str(k): convert_numpy_types(v) for k, v in obj.items()}
+    elif isinstance(obj, list):
+        return [convert_numpy_types(item) for item in obj]
+    elif isinstance(obj, tuple):
+        return tuple(convert_numpy_types(item) for item in obj)
+    elif isinstance(obj, set):
+        return {convert_numpy_types(item) for item in obj}
+    elif hasattr(obj, 'item') and hasattr(obj, 'dtype'):  # numpy scalar
+        return convert_numpy_types(obj.item())
+    else:
+        return obj
+
+
+# ============= 整合的工作流状态定义 =============
+
+class IntegratedWorkflowState(TypedDict):
+    """整合的工作流状态定义 - 兼容多个Agent系统"""
+
+    # === 基础输入层 (兼容Big Agent) ===
+    user_input: str
+    question: str  # 别名,兼容报告生成Agent
+
+    industry: str  # 行业
+
+    # === 数据层 ===
+    data_set: List[Dict[str, Any]]  # 报告生成Agent的数据格式
+    transactions_df: Optional[Any]  # 可选的数据框格式
+
+    # === 意图识别层 (Big Agent原有) ===
+    intent_result: Optional[Dict[str, Any]]
+
+    # === 规划和大纲层 (新增) ===
+    planning_step: int
+    plan_history: List[str]
+    outline_draft: Optional[ReportOutline]
+    outline_version: int
+    outline_ready: bool
+
+    # === 指标计算层 ===
+    metrics_requirements: List[MetricRequirement]  # 报告生成Agent格式
+    computed_metrics: Dict[str, Any]  # 计算结果
+    metrics_cache: Dict[str, Any]  # 缓存
+    pending_metric_ids: List[str]  # 待计算指标ID
+    failed_metric_attempts: Dict[str, int]  # 失败统计
+    calculation_results: Optional[Dict[str, Any]]  # Big Agent格式的计算结果
+
+    # === 结果层 ===
+    report_draft: Dict[str, Any]  # 报告草稿
+    knowledge_result: Optional[Dict[str, Any]]  # Big Agent知识沉淀结果
+    is_complete: bool
+    completeness_score: float
+    answer: Optional[str]  # 最终答案
+
+    # === 对话和消息层 ===
+    messages: List[Dict[str, Any]]  # Big Agent消息格式
+    current_node: str
+    session_id: str
+    next_route: str
+
+    # === 错误处理层 ===
+    errors: List[str]
+    last_decision: str
+
+    # === 时间跟踪层 ===
+    start_time: str
+    end_time: Optional[str]
+    api_result: Dict[str, Any]  # 存储所有API调用结果
+
+
+# ============= 状态创建和初始化函数 =============
+
+def create_initial_integrated_state(question: str, industry: str, data: List[Dict[str, Any]], session_id: str = None) -> IntegratedWorkflowState:
+    """
+    创建初始的整合状态
+
+    Args:
+        question: 用户查询
+        data: 数据集
+        session_id: 会话ID
+
+    Returns:
+        初始化后的状态
+    """
+    current_time = datetime.now().isoformat()
+    session = session_id or f"session_{int(datetime.now().timestamp())}"
+
+    return {
+        # 基础输入
+        "user_input": question,
+        "question": question,
+        "industry": industry,
+
+        # 数据层
+        "data_set": convert_numpy_types(data),
+        "transactions_df": None,
+
+        # 意图识别层
+        "intent_result": None,
+
+        # 规划和大纲层
+        "planning_step": 0,
+        "plan_history": [],
+        "outline_draft": None,
+        "outline_version": 0,
+        "outline_ready": False,
+
+        # 指标计算层
+        "metrics_requirements": [],
+        "computed_metrics": {},
+        "metrics_cache": {},
+        "pending_metric_ids": [],
+        "failed_metric_attempts": {},
+        "calculation_results": None,
+
+        # 结果层
+        "report_draft": {},
+        "knowledge_result": None,
+        "is_complete": False,
+        "completeness_score": 0.0,
+        "answer": None,
+
+        # 对话和消息层
+        "messages": [{
+            "role": "user",
+            "content": question,
+            "timestamp": current_time
+        }],
+        "current_node": "start",
+        "session_id": session,
+        "next_route": "planning_node",
+
+        # 错误处理层
+        "errors": [],
+        "last_decision": "init",
+
+        # 时间跟踪层
+        "start_time": current_time,
+        "end_time": None,
+        "api_result": {},  # 存储所有API调用结果
+
+        # 计算模式配置层
+        "use_rules_engine_only": False,
+        "use_traditional_engine_only": False
+    }
+
+
+def is_state_ready_for_calculation(state: IntegratedWorkflowState) -> bool:
+    """
+    检查状态是否准备好进行指标计算
+
+    Args:
+        state: 当前状态
+
+    Returns:
+        是否准备好
+    """
+    return (
+        state.get("outline_draft") is not None and
+        len(state.get("metrics_requirements", [])) > 0 and
+        len(state.get("pending_metric_ids", [])) > 0
+    )
+
+
+def get_calculation_progress(state: IntegratedWorkflowState) -> Dict[str, Any]:
+    """
+    获取指标计算进度信息
+
+    Args:
+        state: 当前状态
+
+    Returns:
+        进度信息
+    """
+    required = len(state.get("metrics_requirements", []))
+    computed = len(state.get("computed_metrics", {}))
+    pending = len(state.get("pending_metric_ids", []))
+
+    return {
+        "required_count": required,
+        "computed_count": computed,
+        "pending_count": pending,
+        "coverage_rate": computed / required if required > 0 else 0,
+        "is_complete": computed >= required * 0.8  # 80%覆盖率视为完成
+    }
+
+
+def update_state_with_outline_generation(state: IntegratedWorkflowState, outline: ReportOutline) -> IntegratedWorkflowState:
+    """
+    使用大纲生成结果更新状态
+
+    Args:
+        state: 当前状态
+        outline: 生成的大纲
+
+    Returns:
+        更新后的状态
+    """
+    new_state = state.copy()
+    new_state["outline_draft"] = outline
+    new_state["outline_version"] += 1
+    new_state["outline_ready"] = True
+    new_state["metrics_requirements"] = outline.global_metrics
+    new_state["pending_metric_ids"] = [m.metric_id for m in outline.global_metrics]
+
+    # 添加消息
+    new_state["messages"].append({
+        "role": "assistant",
+        "content": f"✅ 大纲生成完成 v{new_state['outline_version']}:{outline.report_title}",
+        "timestamp": datetime.now().isoformat()
+    })
+
+    return new_state
+
+
+def update_state_with_planning_decision(state: IntegratedWorkflowState, decision: Dict[str, Any]) -> IntegratedWorkflowState:
+    """
+    使用规划决策结果更新状态
+
+    Args:
+        state: 当前状态
+        decision: 规划决策
+
+    Returns:
+        更新后的状态
+    """
+    new_state = state.copy()
+    new_state["planning_step"] += 1
+    new_state["last_decision"] = decision.get("decision", "unknown")
+    new_state["next_route"] = decision.get("next_route", "planning_node")
+
+    # 如果有待计算指标,更新待计算列表
+    if decision.get("metrics_to_compute"):
+        new_state["pending_metric_ids"] = decision["metrics_to_compute"]
+
+    # 添加规划历史
+    new_state["plan_history"].append(
+        f"Step {new_state['planning_step']}: {decision.get('decision', 'unknown')}"
+    )
+
+    return new_state
+
+
+def finalize_state_with_report(state: IntegratedWorkflowState, final_report: Dict[str, Any]) -> IntegratedWorkflowState:
+    """
+    使用最终报告完成状态
+
+    Args:
+        state: 当前状态
+        final_report: 最终报告
+
+    Returns:
+        完成的状态
+    """
+    new_state = state.copy()
+    new_state["report_draft"] = final_report
+    new_state["is_complete"] = True
+    new_state["answer"] = final_report
+    new_state["end_time"] = datetime.now().isoformat()
+
+    # 计算完整性分数
+    progress = get_calculation_progress(new_state)
+    new_state["completeness_score"] = progress["coverage_rate"]
+
+    return new_state