wangyang преди 2 дни
родител
ревизия
49a0b306f8
променени са 2 файла, в които са добавени 57 реда и са изтрити 63 реда
  1. 3 0
      llmops/agents/rules_engine_metric_calculation_agent.py
  2. 54 63
      llmops/complete_agent_flow_rule.py

+ 3 - 0
llmops/agents/rules_engine_metric_calculation_agent.py

@@ -74,6 +74,9 @@ class RulesEngineMetricCalculationAgent:
 
         # 初始化API调用跟踪
         self.api_calls = []
+        # 加载配置文件
+        self.configs = {}
+
 
     def _load_data_files(self) -> Dict[str, str]:
         """加载数据文件映射"""

+ 54 - 63
llmops/complete_agent_flow_rule.py

@@ -41,8 +41,8 @@ from workflow_state import (
     convert_numpy_types,
 
 )
-from agents.outline_agent import  generate_report_outline
-from agents.planning_agent import  plan_next_action
+from agents.outline_agent import OutlineGeneratorAgent, generate_report_outline
+from agents.planning_agent import PlanningAgent, plan_next_action, analyze_current_state
 from agents.rules_engine_metric_calculation_agent import RulesEngineMetricCalculationAgent
 
 
@@ -91,7 +91,7 @@ class CompleteAgentFlow:
 
         # 从各个节点返回规划节点重新决策
         workflow.add_edge("outline_generator", "planning_node")
-        workflow.add_edge("metric_calculator", END)
+        workflow.add_edge("metric_calculator", "planning_node")
 
         return workflow
 
@@ -119,11 +119,6 @@ class CompleteAgentFlow:
             print("→ 路由到 outline_generator(生成大纲)")
             return "outline_generator"
 
-        # 如果指标需求为空但大纲已生成 → 评估指标需求
-        if not state.get("metrics_requirements") and state.get("outline_draft"):
-            print("→ 路由到 metric_evaluator(评估指标需求)")
-            return "metric_evaluator"
-
         # 计算覆盖率
         progress = get_calculation_progress(state)
         coverage = progress["coverage_rate"]
@@ -135,10 +130,22 @@ class CompleteAgentFlow:
             print(f"→ 路由到 metric_calculator(计算指标,覆盖率={coverage:.2%})")
             return "metric_calculator"
 
-        # 如果没有待计算指标或覆盖率 >= 80% → 生成最终报告
-        if not state.get("pending_metric_ids") or coverage >= 0.8:
-            print(f"→ 路由到 report_finalizer(生成最终报告,覆盖率={coverage:.2%})")
-            return "report_finalizer"
+        # 检查是否应该结束流程
+        pending_ids = state.get("pending_metric_ids", [])
+        failed_attempts = state.get("failed_metric_attempts", {})
+        max_retries = 3
+
+        # 计算还有哪些指标可以重试(未达到最大重试次数)
+        retryable_metrics = [
+            mid for mid in pending_ids
+            if failed_attempts.get(mid, 0) < max_retries
+        ]
+
+        # 如果覆盖率 >= 80%,或者没有可重试的指标 → 结束流程
+        if coverage >= 0.8 or not retryable_metrics:
+            reason = "覆盖率达到80%" if coverage >= 0.8 else "没有可重试指标"
+            print(f"→ 结束流程(覆盖率={coverage:.2%},原因:{reason})")
+            return END
 
         # 默认返回规划节点
         return "planning_node"
@@ -263,50 +270,6 @@ class CompleteAgentFlow:
         print('   能够根据具体业务场景动态调整分析框架,确保分析的针对性和有效性。')
         print()
 
-    async def _metric_evaluator_node(self, state: IntegratedWorkflowState) -> IntegratedWorkflowState:
-        """指标评估节点:根据大纲确定需要计算的指标"""
-        try:
-            print("🔍 正在评估指标需求...")
-
-            new_state = state.copy()
-            outline = state.get("outline_draft")
-
-            if not outline:
-                print("⚠️ 没有大纲信息,跳过指标评估")
-                return convert_numpy_types(new_state)
-
-            # 从大纲中提取指标需求
-            metrics_requirements = outline.global_metrics
-            metric_ids = [m.metric_id for m in metrics_requirements]
-
-            # 设置待计算指标
-            new_state["metrics_requirements"] = metrics_requirements
-            new_state["pending_metric_ids"] = metric_ids.copy()
-            new_state["computed_metrics"] = {}
-            new_state["metrics_cache"] = {}
-
-            print(f"✅ 指标评估完成,发现 {len(metric_ids)} 个待计算指标")
-            for i, metric_id in enumerate(metric_ids[:5], 1):  # 只显示前5个
-                print(f"   {i}. {metric_id}")
-
-            if len(metric_ids) > 5:
-                print(f"   ... 还有 {len(metric_ids) - 5} 个指标")
-
-            # 添加消息
-            new_state["messages"].append({
-                "role": "assistant",
-                "content": f"🔍 指标评估完成:发现 {len(metric_ids)} 个待计算指标",
-                "timestamp": datetime.now().isoformat()
-            })
-
-            return convert_numpy_types(new_state)
-
-        except Exception as e:
-            print(f"❌ 指标评估失败: {e}")
-            new_state = state.copy()
-            new_state["errors"].append(f"指标评估错误: {str(e)}")
-            return convert_numpy_types(new_state)
-
     async def _metric_calculator_node(self, state: IntegratedWorkflowState) -> IntegratedWorkflowState:
         """指标计算节点"""
         try:
@@ -399,27 +362,55 @@ class CompleteAgentFlow:
                         }
 
                     # 处理计算结果
+                    calculation_success = False
                     for result in results.get("results", []):
                         if result.get("result", {}).get("success"):
                             # 计算成功
                             new_state["computed_metrics"][metric_id] = result["result"]
                             successful_calculations += 1
+                            calculation_success = True
                             print(f"✅ 指标 {metric_id} 计算成功")
+                            break  # 找到一个成功的就算成功
                         else:
                             # 计算失败
                             failed_calculations += 1
                             print(f"❌ 指标 {metric_id} 计算失败")
 
-                    # 从待计算列表中移除(无论成功还是失败)
-                    if metric_id in new_state["pending_metric_ids"]:
-                        new_state["pending_metric_ids"].remove(metric_id)
+                    # 初始化失败尝试记录
+                    if "failed_metric_attempts" not in new_state:
+                        new_state["failed_metric_attempts"] = {}
+
+                    # 根据计算结果处理指标
+                    if calculation_success:
+                        # 计算成功:从待计算列表中移除
+                        if metric_id in new_state["pending_metric_ids"]:
+                            new_state["pending_metric_ids"].remove(metric_id)
+                        # 重置失败计数
+                        new_state["failed_metric_attempts"].pop(metric_id, None)
+                    else:
+                        # 计算失败:记录失败次数,不从待计算列表移除
+                        new_state["failed_metric_attempts"][metric_id] = new_state["failed_metric_attempts"].get(metric_id, 0) + 1
+                        max_retries = 3
+                        if new_state["failed_metric_attempts"][metric_id] >= max_retries:
+                            print(f"⚠️ 指标 {metric_id} 已达到最大重试次数 ({max_retries}),从待计算列表中移除")
+                            if metric_id in new_state["pending_metric_ids"]:
+                                new_state["pending_metric_ids"].remove(metric_id)
 
                 except Exception as e:
                     print(f"❌ 计算指标 {metric_id} 时发生异常: {e}")
                     failed_calculations += 1
-                    # 即使异常,也要从待计算列表中移除,避免无限循环
-                    if metric_id in new_state["pending_metric_ids"]:
-                        new_state["pending_metric_ids"].remove(metric_id)
+
+                    # 初始化失败尝试记录
+                    if "failed_metric_attempts" not in new_state:
+                        new_state["failed_metric_attempts"] = {}
+
+                    # 记录失败次数
+                    new_state["failed_metric_attempts"][metric_id] = new_state["failed_metric_attempts"].get(metric_id, 0) + 1
+                    max_retries = 3
+                    if new_state["failed_metric_attempts"][metric_id] >= max_retries:
+                        print(f"⚠️ 指标 {metric_id} 异常已达到最大重试次数 ({max_retries}),从待计算列表中移除")
+                        if metric_id in new_state["pending_metric_ids"]:
+                            new_state["pending_metric_ids"].remove(metric_id)
 
             # 更新计算结果统计
             new_state["calculation_results"] = {
@@ -463,7 +454,7 @@ class CompleteAgentFlow:
         decision_routes = {
             "generate_outline": "outline_generator",
             "compute_metrics": "metric_calculator",
-            "finalize_report": "report_finalizer"
+            "finalize_report": END  # 直接结束流程
         }
         return decision_routes.get(decision, "planning_node")