Sfoglia il codice sorgente

启动逻辑还原到11点的版本

wangyang 2 giorni fa
parent
commit
7616e0a1b1
1 ha cambiato i file con 63 aggiunte e 61 eliminazioni
  1. 63 61
      llmops/complete_agent_flow_rule.py

+ 63 - 61
llmops/complete_agent_flow_rule.py

@@ -32,7 +32,7 @@ from typing import Dict, Any, List
 from datetime import datetime
 from langgraph.graph import StateGraph, START, END
 
-from llmops.workflow_state import (
+from workflow_state import (
     IntegratedWorkflowState,
     create_initial_integrated_state,
     get_calculation_progress,
@@ -40,6 +40,7 @@ from llmops.workflow_state import (
     update_state_with_planning_decision,
     update_state_with_data_classified,
     convert_numpy_types,
+
 )
 from llmops.agents.outline_agent import  generate_report_outline
 from llmops.agents.planning_agent import  plan_next_action
@@ -75,6 +76,7 @@ class CompleteAgentFlow:
         # 添加节点
         workflow.add_node("planning_node", self._planning_node)
         workflow.add_node("outline_generator", self._outline_generator_node)
+        workflow.add_node("metric_calculator", self._metric_calculator_node)
         workflow.add_node("data_classify", self._data_classify_node)
 
         # 设置入口点
@@ -145,10 +147,22 @@ class CompleteAgentFlow:
             print(f"→ 路由到 metric_calculator(计算指标,覆盖率={coverage:.2%})")
             return "metric_calculator"
 
-        # 如果没有待计算指标或覆盖率 >= 80% → 生成最终报告
-        if not state.get("pending_metric_ids") or coverage >= 0.8:
-            print(f"→ 路由到 report_finalizer(生成最终报告,覆盖率={coverage:.2%})")
-            return "report_finalizer"
+        # 检查是否应该结束流程
+        pending_ids = state.get("pending_metric_ids", [])
+        failed_attempts = state.get("failed_metric_attempts", {})
+        max_retries = 3
+
+        # 计算还有哪些指标可以重试(未达到最大重试次数)
+        retryable_metrics = [
+            mid for mid in pending_ids
+            if failed_attempts.get(mid, 0) < max_retries
+        ]
+
+        # 如果覆盖率 >= 80%,或者没有可重试的指标 → 结束流程
+        if coverage >= 0.8 or not retryable_metrics:
+            reason = "覆盖率达到80%" if coverage >= 0.8 else "没有可重试指标"
+            print(f"→ 结束流程(覆盖率={coverage:.2%},原因:{reason})")
+            return END
 
         # 默认返回规划节点
         return "planning_node"
@@ -299,50 +313,6 @@ class CompleteAgentFlow:
         print('   能够根据具体业务场景动态调整分析框架,确保分析的针对性和有效性。')
         print()
 
-    async def _metric_evaluator_node(self, state: IntegratedWorkflowState) -> IntegratedWorkflowState:
-        """指标评估节点:根据大纲确定需要计算的指标"""
-        try:
-            print("🔍 正在评估指标需求...")
-
-            new_state = state.copy()
-            outline = state.get("outline_draft")
-
-            if not outline:
-                print("⚠️ 没有大纲信息,跳过指标评估")
-                return convert_numpy_types(new_state)
-
-            # 从大纲中提取指标需求
-            metrics_requirements = outline.global_metrics
-            metric_ids = [m.metric_id for m in metrics_requirements]
-
-            # 设置待计算指标
-            new_state["metrics_requirements"] = metrics_requirements
-            new_state["pending_metric_ids"] = metric_ids.copy()
-            new_state["computed_metrics"] = {}
-            new_state["metrics_cache"] = {}
-
-            print(f"✅ 指标评估完成,发现 {len(metric_ids)} 个待计算指标")
-            for i, metric_id in enumerate(metric_ids[:5], 1):  # 只显示前5个
-                print(f"   {i}. {metric_id}")
-
-            if len(metric_ids) > 5:
-                print(f"   ... 还有 {len(metric_ids) - 5} 个指标")
-
-            # 添加消息
-            new_state["messages"].append({
-                "role": "assistant",
-                "content": f"🔍 指标评估完成:发现 {len(metric_ids)} 个待计算指标",
-                "timestamp": datetime.now().isoformat()
-            })
-
-            return convert_numpy_types(new_state)
-
-        except Exception as e:
-            print(f"❌ 指标评估失败: {e}")
-            new_state = state.copy()
-            new_state["errors"].append(f"指标评估错误: {str(e)}")
-            return convert_numpy_types(new_state)
-
     async def _metric_calculator_node(self, state: IntegratedWorkflowState) -> IntegratedWorkflowState:
         """指标计算节点"""
         try:
@@ -388,11 +358,15 @@ class CompleteAgentFlow:
                     # 找到对应的指标需求
                     metric_req = next((m for m in metrics_requirements if m.metric_id == metric_id), None)
                     if not metric_req:
-                        print(f"⚠️ 找不到指标 {metric_id} 的需求信息,跳过")
-                        # 仍然从待计算列表中移除,避免无限循环
-                        if metric_id in new_state["pending_metric_ids"]:
-                            new_state["pending_metric_ids"].remove(metric_id)
-                        continue
+                        # 修复:找不到指标需求时,创建临时的指标需求结构,避免跳过指标
+                        print(f"⚠️ 指标 {metric_id} 找不到需求信息,创建临时配置继续计算")
+                        metric_req = type('MetricRequirement', (), {
+                            'metric_id': metric_id,
+                            'metric_name': metric_id.replace('metric-', '') if metric_id.startswith('metric-') else metric_id,
+                            'calculation_logic': f'计算 {metric_id}',
+                            'required_fields': ['transactions'],
+                            'dependencies': []
+                        })()
 
                     print(f"🧮 计算指标: {metric_id} - {metric_req.metric_name}")
 
@@ -437,27 +411,55 @@ class CompleteAgentFlow:
                         }
 
                     # 处理计算结果
+                    calculation_success = False
                     for result in results.get("results", []):
                         if result.get("result", {}).get("success"):
                             # 计算成功
                             new_state["computed_metrics"][metric_id] = result["result"]
                             successful_calculations += 1
+                            calculation_success = True
                             print(f"✅ 指标 {metric_id} 计算成功")
+                            break  # 找到一个成功的就算成功
                         else:
                             # 计算失败
                             failed_calculations += 1
                             print(f"❌ 指标 {metric_id} 计算失败")
 
-                    # 从待计算列表中移除(无论成功还是失败)
-                    if metric_id in new_state["pending_metric_ids"]:
-                        new_state["pending_metric_ids"].remove(metric_id)
+                    # 初始化失败尝试记录
+                    if "failed_metric_attempts" not in new_state:
+                        new_state["failed_metric_attempts"] = {}
+
+                    # 根据计算结果处理指标
+                    if calculation_success:
+                        # 计算成功:从待计算列表中移除
+                        if metric_id in new_state["pending_metric_ids"]:
+                            new_state["pending_metric_ids"].remove(metric_id)
+                        # 重置失败计数
+                        new_state["failed_metric_attempts"].pop(metric_id, None)
+                    else:
+                        # 计算失败:记录失败次数,不从待计算列表移除
+                        new_state["failed_metric_attempts"][metric_id] = new_state["failed_metric_attempts"].get(metric_id, 0) + 1
+                        max_retries = 3
+                        if new_state["failed_metric_attempts"][metric_id] >= max_retries:
+                            print(f"⚠️ 指标 {metric_id} 已达到最大重试次数 ({max_retries}),从待计算列表中移除")
+                            if metric_id in new_state["pending_metric_ids"]:
+                                new_state["pending_metric_ids"].remove(metric_id)
 
                 except Exception as e:
                     print(f"❌ 计算指标 {metric_id} 时发生异常: {e}")
                     failed_calculations += 1
-                    # 即使异常,也要从待计算列表中移除,避免无限循环
-                    if metric_id in new_state["pending_metric_ids"]:
-                        new_state["pending_metric_ids"].remove(metric_id)
+
+                    # 初始化失败尝试记录
+                    if "failed_metric_attempts" not in new_state:
+                        new_state["failed_metric_attempts"] = {}
+
+                    # 记录失败次数
+                    new_state["failed_metric_attempts"][metric_id] = new_state["failed_metric_attempts"].get(metric_id, 0) + 1
+                    max_retries = 3
+                    if new_state["failed_metric_attempts"][metric_id] >= max_retries:
+                        print(f"⚠️ 指标 {metric_id} 异常已达到最大重试次数 ({max_retries}),从待计算列表中移除")
+                        if metric_id in new_state["pending_metric_ids"]:
+                            new_state["pending_metric_ids"].remove(metric_id)
 
             # 更新计算结果统计
             new_state["calculation_results"] = {
@@ -502,7 +504,7 @@ class CompleteAgentFlow:
             "data_classify": "data_classify",
             "generate_outline": "outline_generator",
             "compute_metrics": "metric_calculator",
-            "finalize_report": "report_finalizer"
+            "finalize_report": END  # 直接结束流程
         }
         return decision_routes.get(decision, "planning_node")