Bläddra i källkod

再更新提交一版
1、删除不必要的代码
2、优化配置抽离到外面
3、增加指标补充处理

wangyang 3 dagar sedan
förälder
incheckning
46ea5db8ff

+ 397 - 126
llmops/agents/outline_agent.py

@@ -38,6 +38,7 @@ import requests
 from datetime import datetime
 
 from pydantic import BaseModel, Field
+from llmops.config import RULES_ENGINE_BASE_URL
 
 
 # 数据模型定义(与现有项目兼容)
@@ -158,30 +159,8 @@ class OutlineGeneratorAgent:
                             "dependencies": []
                         })
 
-        # 注意:现在依赖LLM根据提示词生成包含所有必需指标的大纲,不再在代码中强制添加
-
-        # 如果LLM没有提供任何指标,则自动补充基础指标
-        if not global_metrics:
-            print("⚠️ LLM未提供指标,使用默认基础指标")
-            available_metrics = self._load_available_metrics()
-
-            # 选择前5个基础指标
-            base_metrics = [m for m in available_metrics if m.get('type') == '基础统计指标'][:5]
-
-            for metric in base_metrics:
-                metric_name = metric['name']
-                knowledge_id = f"metric-{metric_name}"
-                if sections:  # 确保有章节
-                    sections[0]["metrics_needed"].append(knowledge_id)  # 添加到第一个章节
-                global_metrics.append({
-                    "metric_id": knowledge_id,
-                    "metric_name": metric_name,
-                    "calculation_logic": f"使用规则引擎计算{metric_name}: {metric.get('description', '')}",
-                    "required_fields": ["transactions"],
-                    "dependencies": []
-                })
-
-        print(f"📊 最终生成 {len(global_metrics)} 个指标")
+        # 完全依赖LLM生成包含所有必需指标的大纲
+        print(f"🤖 大模型生成 {len(global_metrics)} 个指标")
 
         return {
             "report_title": new_format_data.get("chapter_title", "流水分析报告"),
@@ -189,7 +168,7 @@ class OutlineGeneratorAgent:
             "global_metrics": global_metrics
         }
 
-    def create_prompt(self) -> str:
+    def create_prompt(self, question: str, industry: str) -> str:
         """创建大纲生成提示"""
 
         # 从API动态获取可用的指标列表
@@ -203,15 +182,12 @@ class OutlineGeneratorAgent:
         # 构建基础提示词
         base_prompt = f"""[角色定义]
 你的角色是: 流水分析报告的大纲生成模块。
-你的目标是:
-基于输入的流水分析业务背景信息,
-生成一份可交付、结构清晰、可被程序解析的流水分析报告大纲,
-并以结构化 JSON 的形式,明确每个章节及其下属分析主题所需的分析指标与分析项要求,
-以指导后续分析能力的调用。
+你的目标是:{question},生成一份针对{industry}行业的全面的流水分析报告大纲。
+生成结构清晰、可被程序解析的JSON格式大纲,明确每个章节及其下属分析主题所需的分析指标。
 
 [职责边界]
 你只能完成以下事项:
-1.确定流水分析报告应包含的章节结构
+1.确定{industry}流水分析报告应包含的章节结构
 2.明确每个章节下需要覆盖的分析主题
 3.为每个分析主题列出所需的计算指标、统计指标或分析指标
 
@@ -226,42 +202,62 @@ class OutlineGeneratorAgent:
 
 [可用指标总览]
 系统当前支持 {len(available_metrics)} 个指标。
+指标内容为{available_metrics}
 
 [重要要求]
-请根据用户需求和可用指标列表,从上述指标中选择最相关的指标。优先选择基础统计指标和时间分析指标,确保报告的完整性和实用性
+请根据用户需求和可用指标列表,从上述指标中选择最相关的指标。必须基于用户查询的具体需求进行智能匹配,确保选择的指标能够充分满足分析需求
 
 [强制要求]
 生成大纲时,请:
-1. 从可用指标中选择合适的指标组合
-2. 确保选择的指标能够满足用户分析需求
-3. 在metrics_needed数组中列出选定的指标名称
-4. 在global_metrics数组中包含对应指标的详细定义
+1. 仔细分析用户查询,识别所有提到的分析需求点
+2. 从可用指标中选择能够满足这些需求的完整指标组合
+3. 基于语义相关性进行指标筛选,不要过于保守
+4. 在各章节的metrics对象中,按照指标类型(calculation_metrics/statistical_metrics/analysis_metrics)列出选定的指标
+5. 为每个指标提供metric_name和metric_description字段
+6. 优先选择与用户查询直接相关的指标
 
 [可选择的指标列表]
 {metrics_list_text}
 
-[重要兼容性要求]
-虽然你必须使用上述JSON结构输出,但为了确保与现有系统的兼容性,请在输出中额外包含以下字段:
-- 在根级别添加 "report_title": "流水分析报告"
-- 在根级别添加 "global_metrics": [] (空数组或根据实际需求填充指标定义)
-- 确保输出能被现有系统正确解析和使用
+[重要说明]
+请确保:
+- 从提供的可用指标列表中选择最相关的指标
+- 为每个选定的指标提供清晰的名称和描述
+- 输出格式必须严格遵循上述JSON结构
+- 确保选择的指标能够满足用户查询的具体分析需求
 
 [输出格式要求]
 你必须且只能以 JSON 字符串 形式输出分析大纲,不得输出任何解释性自然语言。
 JSON 必须严格遵循以下结构约定:
 {{
-  "chapter_id": "string",
   "chapter_title": "string",
-  "chapter_type": "string",
   "sections": [
     {{
       "section_id": "string",
       "section_title": "string",
       "section_description": "string",
-      "metrics_needed": ["string"]
+      "metrics": {{
+        "calculation_metrics": [
+          {{
+            "metric_name": "string",
+            "metric_description": "string"
+          }}
+        ],
+        "statistical_metrics": [
+          {{
+            "metric_name": "string",
+            "metric_description": "string"
+          }}
+        ],
+        "analysis_metrics": [
+          {{
+            "metric_name": "string",
+            "metric_description": "string"
+          }}
+        ]
+      }}
     }}
-  ],
-  "global_metrics": []
+  ]
 }}"""
 
         return base_prompt
@@ -277,7 +273,7 @@ JSON 必须严格遵循以下结构约定:
 
     async def generate_outline(self, question: str, industry: str, sample_data: List[Dict[str, Any]]) -> ReportOutline:
         """异步生成大纲(修复版:自动补全缺失字段)"""
-        prompt = self.create_prompt()
+        prompt = self.create_prompt(question, industry)
 
         # 在prompt末尾添加业务背景信息
         full_prompt = f"""{prompt}
@@ -355,7 +351,8 @@ JSON 必须严格遵循以下结构约定:
         # 保存API结果到文件
         api_results_dir = "api_results"
         os.makedirs(api_results_dir, exist_ok=True)
-        filename = f"{call_id}.json"
+        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+        filename = f"{timestamp}_{call_id}.json"
         filepath = os.path.join(api_results_dir, filename)
 
         try:
@@ -370,13 +367,13 @@ JSON 必须严格遵循以下结构约定:
         print("========================================")
 
         # 后处理,补全缺失的section_id和metric_id
-        outline = self._post_process_outline(outline)
+        outline = self._post_process_outline(outline, question, industry)
 
         return outline
 
-    def _post_process_outline(self, outline: ReportOutline) -> ReportOutline:
+    def _post_process_outline(self, outline: ReportOutline, question: str, industry: str) -> ReportOutline:
         """
-        后处理大纲,自动补全缺失的必需字段
+        后处理大纲,自动补全缺失的必需字段,并基于查询优化指标
         """
         # 为章节补全section_id
         for idx, section in enumerate(outline.sections):
@@ -402,6 +399,11 @@ JSON 必须严格遵循以下结构约定:
                     metric.calculation_logic
                 )
 
+        # 基于用户查询进行指标优化筛选
+        if hasattr(outline, 'global_metrics') and outline.global_metrics:
+            print(f"📊 AI选择了 {len(outline.global_metrics)} 个指标,进行智能优化...")
+            outline = self._optimize_metrics_by_query(outline, question, industry)
+
         return outline
 
     def _infer_required_fields(self, logic: str) -> List[str]:
@@ -424,6 +426,293 @@ JSON 必须严格遵循以下结构约定:
 
         return list(set(fields))
 
+    def _optimize_metrics_by_query(self, outline: ReportOutline, question: str, industry: str) -> ReportOutline:
+        """
+        基于用户查询进行智能指标优化
+        """
+        # 获取所有可用指标
+        available_metrics = self._load_available_metrics()
+
+        # 已选择的指标名称集合
+        selected_metric_names = {m.metric_name for m in outline.global_metrics}
+
+        # 基于用户查询进行语义匹配,找出缺失的关键指标
+        query_keywords = self._extract_query_keywords(question, industry)
+        missing_key_metrics = self._find_missing_key_metrics(
+            available_metrics, selected_metric_names, query_keywords
+        )
+
+        # 补充缺失的关键指标
+        supplemented_count = 0
+        for metric_name in missing_key_metrics:
+            # 找到对应的可用指标信息
+            available_metric = next((m for m in available_metrics if m['name'] == metric_name), None)
+            if available_metric:
+                # 创建MetricRequirement对象
+                metric_req = MetricRequirement(
+                    metric_id=f"metric-{metric_name}",
+                    metric_name=metric_name,
+                    calculation_logic=f"使用规则引擎计算{metric_name}",
+                    required_fields=["transactions"],
+                    dependencies=[]
+                )
+                outline.global_metrics.append(metric_req)
+                supplemented_count += 1
+                print(f"  补充关键指标: {metric_name}")
+
+        if supplemented_count > 0:
+            print(f"✅ 基于查询分析补充了 {supplemented_count} 个关键指标,总计 {len(outline.global_metrics)} 个指标")
+
+        # 智能分配章节指标需求
+        self._smart_assign_section_metrics(outline)
+
+        return outline
+
+    def _extract_query_keywords(self, question: str, industry: str) -> List[str]:
+        """
+        通过大模型从用户查询中提取关键词
+
+        Args:
+            question: 用户查询
+            industry: 行业信息
+
+        Returns:
+            关键词列表
+        """
+        try:
+            keyword_prompt = ChatPromptTemplate.from_messages([
+                ("system", """你是一个专业的关键词提取专家,需要从用户查询中提取关键的分析指标和业务术语。
+
+请分析查询内容,识别出用户关心的核心指标、分析维度和业务概念。
+
+返回格式:
+请以逗号分隔的关键词列表形式返回,不要其他解释。
+
+示例:
+收入分析, 支出统计, 交易对手, 时间趋势, 占比分析"""),
+                ("human", """用户查询:{question}
+行业背景:{industry}
+
+请提取这个查询中的关键分析指标和业务术语。""")
+            ])
+
+            chain = keyword_prompt | self.llm
+            result = chain.invoke({
+                "question": question,
+                "industry": industry
+            })
+
+            keywords_text = result.content.strip()
+            # 按逗号分割并清理空白
+            keywords = [kw.strip() for kw in keywords_text.split(',') if kw.strip()]
+
+            print(f"🔍 提取到查询关键词: {keywords}")
+            return keywords
+
+        except Exception as e:
+            print(f"⚠️ 关键词提取失败,使用简单分词: {str(e)}")
+            # 备选方案:简单的文本分词
+            import re
+            # 移除标点符号,提取中文词组
+            text = re.sub(r'[^\u4e00-\u9fa5a-zA-Z]', ' ', question)
+            words = [w for w in text.split() if len(w) > 1]
+            print(f"🔄 备选关键词: {words}")
+            return words
+
+    def _find_missing_key_metrics(self, available_metrics: List[Dict], selected_metric_names: set,
+                                  query_keywords: List[str]) -> List[str]:
+        """
+        基于查询关键词找出缺失的关键指标
+
+        Args:
+            available_metrics: 所有可用指标
+            selected_metric_names: 已选择的指标名称集合
+            query_keywords: 查询关键词
+
+        Returns:
+            缺失的关键指标名称列表
+        """
+        if not query_keywords or not available_metrics:
+            return []
+
+        try:
+            missing_prompt = ChatPromptTemplate.from_messages([
+                ("system", """你是一个专业的指标推荐专家,需要根据用户查询的关键词,识别出可能缺失的关键指标。
+
+请分析:
+1. 用户关心的分析维度(收入、支出、排名、趋势等)
+2. 已选择的指标
+3. 可用的指标库
+
+推荐一些重要的缺失指标,帮助完善分析报告。
+
+返回格式:
+只返回指标名称列表,用换行符分隔,不要其他解释。
+
+示例:
+总收入分析
+支出占比统计
+交易对手排名"""),
+                ("human", """查询关键词:{keywords}
+
+已选择的指标:
+{selected_metrics}
+
+可用指标库:
+{available_metrics}
+
+请推荐一些重要的缺失指标。""")
+            ])
+
+            # 格式化输入
+            selected_list = '\n'.join(selected_metric_names) if selected_metric_names else '无'
+            available_list = '\n'.join([m.get('name', '') for m in available_metrics if m.get('name')])
+
+            chain = missing_prompt | self.llm
+            result = chain.invoke({
+                "keywords": ', '.join(query_keywords),
+                "selected_metrics": selected_list,
+                "available_metrics": available_list
+            })
+
+            # 解析结果
+            recommended_metrics = []
+            for line in result.content.strip().split('\n'):
+                metric_name = line.strip()
+                if metric_name and metric_name not in selected_metric_names:
+                    # 验证指标是否存在于可用指标库中
+                    if any(m.get('name') == metric_name for m in available_metrics):
+                        recommended_metrics.append(metric_name)
+
+            print(f"📊 推荐缺失指标: {recommended_metrics}")
+            return recommended_metrics
+
+        except Exception as e:
+            print(f"⚠️ 指标推荐失败: {str(e)}")
+            return []
+
+    def _smart_assign_section_metrics(self, outline: ReportOutline) -> None:
+        """
+        智能分配章节指标需求
+
+        Args:
+            outline: 报告大纲对象,会被原地修改
+        """
+        if not outline.sections or not outline.global_metrics:
+            return
+
+        try:
+            # 获取所有可用的指标ID
+            available_metric_ids = {m.metric_id for m in outline.global_metrics}
+
+            assign_prompt = ChatPromptTemplate.from_messages([
+                ("system", """你是一个专业的报告结构专家,需要将全局指标智能分配到各个章节。
+
+分配原则:
+1. 每个章节分配3-5个最相关的指标
+2. 指标应与章节内容高度相关
+3. 避免重复分配相同的指标
+4. 优先分配核心指标到主要章节
+
+返回格式:
+为每个章节返回指标ID列表,用分号分隔章节,格式如下:
+章节ID:指标ID1,指标ID2,指标ID3
+
+示例:
+sec_1:metric-total_income,metric-expense_trend,metric-profit_margin
+sec_2:metric-customer_analysis,metric-market_share"""),
+                ("human", """报告标题:{report_title}
+
+章节列表:
+{sections}
+
+可用指标:
+{available_metrics}
+
+请为每个章节分配最合适的指标ID。""")
+            ])
+
+            # 格式化输入
+            sections_text = '\n'.join([
+                f"{section.section_id}: {section.title} - {section.description}"
+                for section in outline.sections
+            ])
+
+            available_metrics_text = '\n'.join([
+                f"{m.metric_id}: {m.metric_name} - {m.calculation_logic}"
+                for m in outline.global_metrics
+            ])
+
+            chain = assign_prompt | self.llm
+            result = chain.invoke({
+                "report_title": outline.report_title,
+                "sections": sections_text,
+                "available_metrics": available_metrics_text
+            })
+
+            # 解析结果并分配指标
+            lines = result.content.strip().split('\n')
+            assigned_metrics = set()  # 避免重复分配
+
+            for line in lines:
+                if ':' not in line:
+                    continue
+
+                section_id, metrics_str = line.split(':', 1)
+                section_id = section_id.strip()
+
+                # 找到对应的章节
+                section = next((s for s in outline.sections if s.section_id == section_id), None)
+                if not section:
+                    continue
+
+                # 解析指标ID列表
+                metric_ids = [mid.strip() for mid in metrics_str.split(',') if mid.strip()]
+
+                # 验证指标ID并分配(避免重复)
+                valid_metrics = []
+                for metric_id in metric_ids:
+                    if metric_id in available_metric_ids and metric_id not in assigned_metrics:
+                        valid_metrics.append(metric_id)
+                        assigned_metrics.add(metric_id)
+
+                    # 每个章节最多分配5个指标
+                    if len(valid_metrics) >= 5:
+                        break
+
+                section.metrics_needed = valid_metrics
+                print(f"📋 章节 '{section.title}' 分配了 {len(valid_metrics)} 个指标: {valid_metrics}")
+
+            # 检查是否有章节没有分配到指标,如果有则平均分配剩余指标
+            unassigned_sections = [s for s in outline.sections if not s.metrics_needed]
+            remaining_metrics = [m.metric_id for m in outline.global_metrics if m.metric_id not in assigned_metrics]
+
+            if unassigned_sections and remaining_metrics:
+                print(f"🔄 为 {len(unassigned_sections)} 个未分配章节平均分配剩余指标")
+                metrics_per_section = max(1, len(remaining_metrics) // len(unassigned_sections))
+
+                for i, section in enumerate(unassigned_sections):
+                    start_idx = i * metrics_per_section
+                    end_idx = min(start_idx + metrics_per_section, len(remaining_metrics))
+                    section.metrics_needed = remaining_metrics[start_idx:end_idx]
+                    print(f"📋 章节 '{section.title}' 分配了 {len(section.metrics_needed)} 个指标: {section.metrics_needed}")
+
+        except Exception as e:
+            print(f"⚠️ 智能指标分配失败,使用平均分配: {str(e)}")
+            # 备选方案:平均分配所有指标到各个章节
+            if outline.sections and outline.global_metrics:
+                all_metric_ids = [m.metric_id for m in outline.global_metrics]
+                metrics_per_section = max(1, len(all_metric_ids) // len(outline.sections))
+
+                for i, section in enumerate(outline.sections):
+                    start_idx = i * metrics_per_section
+                    end_idx = min(start_idx + metrics_per_section, len(all_metric_ids))
+                    section.metrics_needed = all_metric_ids[start_idx:end_idx]
+                    print(f"🔄 备选分配 - 章节 '{section.title}' 分配了 {len(section.metrics_needed)} 个指标")
+
+   
+   
+
     def _load_available_knowledge(self) -> List[Dict[str, Any]]:
         """
         从规则引擎获取可用的知识元数据
@@ -432,7 +721,7 @@ JSON 必须严格遵循以下结构约定:
             知识元数据列表,包含id和description
         """
         try:
-            url = "http://10.192.72.11:31809/api/rules/getKnowledgeMeta"
+            url = f"{RULES_ENGINE_BASE_URL}/api/rules/getKnowledgeMeta"
             headers = {
                 "Accept": "*/*",
                 "Accept-Encoding": "gzip, deflate, br",
@@ -518,7 +807,7 @@ JSON 必须严格遵循以下结构约定:
 
     def _match_metric_to_knowledge(self, metric_name: str, metric_description: str) -> str:
         """
-        根据指标名称和描述匹配最合适的知识ID
+        通过大模型判断指标是否与可用知识匹配
 
         Args:
             metric_name: 指标名称
@@ -530,7 +819,7 @@ JSON 必须严格遵循以下结构约定:
         if not self.available_knowledge:
             return ""
 
-        # 精确匹配:直接用指标名称匹配知识ID
+        # 首先尝试精确匹配:直接用指标名称匹配知识ID
         for knowledge in self.available_knowledge:
             knowledge_id = knowledge.get("id", "")
             # 去掉前缀匹配,如 "metric-分析账户数量" 匹配 "分析账户数量"
@@ -538,84 +827,66 @@ JSON 必须严格遵循以下结构约定:
                 print(f"🔗 精确匹配指标 '{metric_name}' -> 知识ID: {knowledge_id}")
                 return knowledge_id
 
-        # 扩展匹配:匹配更多的农业相关指标
-        if "农业" in metric_name:
-            if "总经营收入" in metric_name:
-                # 匹配农业总经营收入
-                for knowledge in self.available_knowledge:
-                    if knowledge.get("id") == "metric-农业总经营收入":
-                        print(f"🔗 扩展匹配指标 '{metric_name}' -> 知识ID: metric-农业总经营收入")
-                        return "metric-农业总经营收入"
-            if "总经营支出" in metric_name:
-                # 匹配农业总经营支出
-                for knowledge in self.available_knowledge:
-                    if knowledge.get("id") == "metric-农业总经营支出":
-                        print(f"🔗 扩展匹配指标 '{metric_name}' -> 知识ID: metric-农业总经营支出")
-                        return "metric-农业总经营支出"
-            if "交易对手收入排名TOP3" in metric_name or "收入排名" in metric_name:
-                # 匹配农业交易对手收入TOP3
-                for knowledge in self.available_knowledge:
-                    if knowledge.get("id") == "metric-农业交易对手经营收入top3":
-                        print(f"🔗 扩展匹配指标 '{metric_name}' -> 知识ID: metric-农业交易对手经营收入top3")
-                        return "metric-农业交易对手经营收入top3"
-            if "交易对手支出排名TOP3" in metric_name or "支出排名" in metric_name:
-                # 匹配农业交易对手支出TOP3
-                for knowledge in self.available_knowledge:
-                    if knowledge.get("id") == "metric-农业交易对手经营支出top3":
-                        print(f"🔗 扩展匹配指标 '{metric_name}' -> 知识ID: metric-农业交易对手经营支出top3")
-                        return "metric-农业交易对手经营支出top3"
-
-        # 如果精确匹配失败,使用关键词匹配
-        keywords = [metric_name]
-        if metric_description:
-            # 从描述中提取关键信息
-            desc_lower = metric_description.lower()
-            if "收入" in metric_name or "收入" in desc_lower:
-                keywords.extend(["收入", "总收入", "经营收入"])
-            if "支出" in metric_name or "支出" in desc_lower:
-                keywords.extend(["支出", "总支出", "经营支出"])
-            if "排名" in metric_name or "top" in desc_lower:
-                keywords.append("排名")
-            if "比例" in metric_name or "占比" in desc_lower:
-                keywords.append("比例")
-            if "时间范围" in metric_name:
-                keywords.append("时间范围")
-            if "账户" in metric_name:
-                keywords.append("账户")
-
-        best_match = None
-        best_score = 0
+        # 使用大模型进行语义匹配
+        match_prompt = ChatPromptTemplate.from_messages([
+            ("system", """你是一个专业的指标匹配专家,需要根据指标名称和描述,从提供的知识库中找到最合适的匹配项。
 
-        for knowledge in self.available_knowledge:
-            knowledge_id = knowledge.get("id", "")
-            knowledge_desc = knowledge.get("description", "").lower()
+请分析指标的语义含义和计算逻辑,判断哪个知识项最匹配。
 
-            # 计算匹配分数
-            score = 0
-            for keyword in keywords:
-                if keyword.lower() in knowledge_desc:
-                    score += 1
+返回格式:
+如果找到匹配:返回知识ID
+如果未找到匹配:返回空字符串 ""
 
-            # 行业匹配加分
-            if "黑色金属" in knowledge_desc and "黑色金属" in metric_name:
-                score += 2
-            if "农业" in knowledge_desc and "农业" in metric_name:
-                score += 2
+只返回知识ID或空字符串,不要其他解释。"""),
+            ("human", """指标信息:
+名称:{metric_name}
+描述:{metric_description}
 
-            # 直接名称匹配加分
-            if metric_name.lower() in knowledge_desc:
-                score += 3
+可用知识库:
+{knowledge_list}
 
-            if score > best_score:
-                best_score = score
-                best_match = knowledge_id
+请判断这个指标是否与知识库中的某个项目匹配。如果匹配,返回对应的知识ID;如果不匹配,返回空字符串。""")
+        ])
 
-        if best_match and best_score > 0:
-            print(f"🔗 关键词匹配指标 '{metric_name}' -> 知识ID: {best_match} (匹配分数: {best_score})")
-            return best_match
+        # 构建知识库描述
+        knowledge_list = "\n".join([
+            f"ID: {k.get('id', '')}\n描述: {k.get('description', '')}"
+            for k in self.available_knowledge
+        ])
 
-        print(f"❌ 指标 '{metric_name}' 未找到匹配的知识ID")
-        return ""
+        try:
+            # 调用大模型进行匹配
+            chain = match_prompt | self.llm
+            result = chain.invoke({
+                "metric_name": metric_name,
+                "metric_description": metric_description or "无描述",
+                "knowledge_list": knowledge_list
+            })
+
+            matched_knowledge_id = result.content.strip()
+
+            # 验证返回的知识ID是否存在于可用知识中
+            if matched_knowledge_id and any(k.get("id") == matched_knowledge_id for k in self.available_knowledge):
+                print(f"🤖 大模型匹配指标 '{metric_name}' -> 知识ID: {matched_knowledge_id}")
+                return matched_knowledge_id
+            else:
+                print(f"❌ 大模型未找到指标 '{metric_name}' 的匹配项")
+                return ""
+
+        except Exception as e:
+            print(f"⚠️ 大模型匹配失败,使用备选方案: {str(e)}")
+            # 备选方案:简单的关键词匹配(不包含特定业务逻辑)
+            for knowledge in self.available_knowledge:
+                knowledge_id = knowledge.get("id", "")
+                knowledge_desc = knowledge.get("description", "").lower()
+
+                # 检查指标名称是否在知识描述中出现
+                if metric_name.lower() in knowledge_desc:
+                    print(f"🔄 备选匹配指标 '{metric_name}' -> 知识ID: {knowledge_id}")
+                    return knowledge_id
+
+            print(f"❌ 指标 '{metric_name}' 未找到匹配的知识ID")
+            return ""
 
 
 async def generate_report_outline(question: str, industry: str, sample_data: List[Dict[str, Any]], api_key: str, max_retries: int = 3, retry_delay: float = 2.0) -> ReportOutline:
@@ -674,7 +945,7 @@ async def generate_report_outline(question: str, industry: str, sample_data: Lis
     print("⚠️ 所有重试均失败,使用默认大纲结构")
 
     # 获取实际可用的指标来构建默认大纲
-    available_metrics = self._load_available_metrics()
+    available_metrics = agent._load_available_metrics()
 
     # 选择一些基础指标作为默认值
     default_metric_ids = []

+ 11 - 90
llmops/agents/planning_agent.py

@@ -1,31 +1,3 @@
-"""
-规划Agent (Planning Agent)
-=========================
-
-此Agent负责分析当前状态并做出智能决策,决定下一步行动。
-
-核心功能:
-1. 状态评估:分析大纲、指标计算进度和完整性
-2. 决策制定:决定生成大纲、计算指标、完成报告或澄清需求
-3. 优先级排序:确定最关键的任务和指标
-4. 流程控制:管理整个报告生成工作流的执行顺序
-
-决策逻辑:
-- 大纲为空 → 生成大纲
-- 指标覆盖率 < 80% → 计算指标
-- 指标覆盖率 ≥ 80% → 生成报告
-- 需求模糊 → 澄清需求
-
-技术实现:
-- 使用LangChain和结构化输出
-- 支持异步处理
-- 智能状态评估
-- 灵活的决策机制
-
-作者: Big Agent Team
-版本: 1.0.0
-创建时间: 2024-12-20
-"""
 
 from typing import List, Dict, Optional, Any, Union
 from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
@@ -119,11 +91,9 @@ class PlanningAgent:
         return ChatPromptTemplate.from_messages([
             ("system", """你是报告规划总控智能体,核心职责是精准分析当前状态并决定下一步行动。
 
-### 决策选项(选一)
+### 决策选项(选一)
 1. generate_outline:大纲未生成或大纲无效
-2. compute_metrics:大纲已生成但指标未完成(覆盖率<80%)
-3. finalize_report:指标覆盖率≥80%,信息充足
-4. clarify_requirements:用户需求模糊,缺少关键信息
+2. compute_metrics:大纲已生成但指标未完成
 
 ### 决策规则(按顺序检查)
 1. 检查 outline_draft 是否为空 → 空则选择 generate_outline
@@ -145,10 +115,8 @@ class PlanningAgent:
 ### 输出字段说明
 - decision: 决策字符串
 - reasoning: 决策原因说明
-- next_actions: 动作列表(可选)
-- metrics_to_compute: 待计算指标ID列表,必须从状态信息中的可用指标ID中选择(决策为compute_metrics时必须提供)
-- priority_metrics: 优先级指标列表(前2-3个最重要的指标)
-- additional_requirements: 额外需求(可选)
+- metrics_to_compute: 待计算指标ID列表,必须从状态信息中的"有效待计算指标ID列表"中选择。选择所有可用指标,除非指标数量过多(>10个)需要分批计算
+- priority_metrics: 优先级指标列表(前2-3个最重要的指标),从metrics_to_compute中选择
 
 必须输出有效的JSON格式!"""),
 
@@ -266,7 +234,8 @@ class PlanningAgent:
         # 保存API结果到文件
         api_results_dir = "api_results"
         os.makedirs(api_results_dir, exist_ok=True)
-        filename = f"{call_id}.json"
+        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+        filename = f"{timestamp}_{call_id}.json"
         filepath = os.path.join(api_results_dir, filename)
 
         try:
@@ -301,9 +270,10 @@ class PlanningAgent:
 
         # 获取可用的指标ID
         available_metric_ids = []
-        if state.get('outline_draft') and state.get('outline_draft').get('global_metrics'):
-            available_metric_ids = [m.get('metric_id', '') for m in state['outline_draft']['global_metrics']]
-            available_metric_ids = [mid for mid in available_metric_ids if mid]  # 过滤空值
+        outline_draft = state.get('outline_draft')
+        if outline_draft and outline_draft.global_metrics:
+            available_metric_ids = [m.metric_id for m in outline_draft.global_metrics if m.metric_id]
+        
 
         return f"""当前状态评估:
 - 规划步骤: {state.get('planning_step', 0)}
@@ -403,53 +373,4 @@ async def plan_next_action(question: str, industry: str, current_state: Dict[str
             priority_metrics=[]
         )
 
-    def _get_default_decision(self, current_state: Dict[str, Any]) -> PlanningDecision:
-        """
-        基于状态分析的默认决策逻辑
-
-        Args:
-            current_state: 当前状态信息
-
-        Returns:
-            默认规划决策
-        """
-        state_analysis = analyze_current_state(current_state)
-
-        if not state_analysis["has_outline"]:
-            default_decision = PlanningDecision(
-                decision="generate_outline",
-                reasoning="大纲不存在,需要先生成大纲",
-                next_actions=["生成报告大纲"],
-                metrics_to_compute=[],
-                priority_metrics=[]
-            )
-        elif state_analysis["coverage"] < 0.8 and state_analysis["valid_pending_metrics"]:
-            # 计算指标 - 使用实际的指标ID
-            metrics_to_compute = state_analysis["valid_pending_ids"][:5]  # 最多计算5个
-            default_decision = PlanningDecision(
-                decision="compute_metrics",
-                reasoning=f"指标覆盖率{state_analysis['coverage']:.1%},需要计算更多指标",
-                next_actions=[f"计算指标: {', '.join(metrics_to_compute)}"],
-                metrics_to_compute=metrics_to_compute,
-                priority_metrics=metrics_to_compute[:2]  # 前2个为优先级
-            )
-        elif state_analysis["valid_pending_ids"]:
-            # 还有指标但都失败了,生成报告
-            default_decision = PlanningDecision(
-                decision="finalize_report",
-                reasoning="部分指标计算失败,但已有足够信息生成报告",
-                next_actions=["生成最终报告"],
-                metrics_to_compute=[],
-                priority_metrics=[]
-            )
-        else:
-            # 信息充足,生成报告
-            default_decision = PlanningDecision(
-                decision="finalize_report",
-                reasoning="所有必要指标已计算完成",
-                next_actions=["生成最终报告"],
-                metrics_to_compute=[],
-                priority_metrics=[]
-            )
-
-        return default_decision
+   

+ 16 - 39
llmops/agents/rules_engine_metric_calculation_agent.py

@@ -55,27 +55,16 @@ from langchain_openai import ChatOpenAI
 from langchain_core.prompts import ChatPromptTemplate
 import re
 
+from llmops.config import RULES_ENGINE_BASE_URL
+
 
 class RulesEngineMetricCalculationAgent:
     """规则引擎指标计算Agent"""
 
     def __init__(self, api_key: str, base_url: str = "https://api.deepseek.com"):
-        """
-        初始化规则引擎指标计算Agent
 
-        Args:
-            api_key: DeepSeek API密钥
-            base_url: DeepSeek API基础URL
-        """
-        self.llm = ChatOpenAI(
-            model="deepseek-chat",
-            api_key=api_key,
-            base_url=base_url,
-            temperature=0.1
-        )
 
-        # 加载配置文件
-        self.configs = self._load_configs()
+
 
         # 获取可用的知识元数据
         self.available_knowledge = self._load_available_knowledge()
@@ -143,23 +132,7 @@ class RulesEngineMetricCalculationAgent:
             print(f"加载数据文件 {data_file_path} 失败: {e}")
             return []
 
-    def _load_configs(self) -> Dict[str, Dict]:
-        """加载所有规则引擎配置文件"""
-        configs = {}
-        json_dir = "json_files"
 
-        if os.path.exists(json_dir):
-            for file in os.listdir(json_dir):
-                if file.endswith('.json') and '规则引擎' in file:
-                    try:
-                        with open(os.path.join(json_dir, file), 'r', encoding='utf-8') as f:
-                            config = json.load(f)
-                            key = file.replace('.json', '')
-                            configs[key] = config
-                    except Exception as e:
-                        print(f"加载规则引擎配置文件 {file} 失败: {e}")
-
-        return configs
 
     def _load_available_knowledge(self) -> List[Dict[str, Any]]:
         """
@@ -169,7 +142,7 @@ class RulesEngineMetricCalculationAgent:
             知识元数据列表,包含id、description和inputField
         """
         try:
-            url = "http://10.192.72.11:31809/api/rules/getKnowledgeMeta"
+            url = f"{RULES_ENGINE_BASE_URL}/api/rules/getKnowledgeMeta"
             headers = {
                 "Accept": "*/*",
                 "Accept-Encoding": "gzip, deflate, br",
@@ -179,7 +152,7 @@ class RulesEngineMetricCalculationAgent:
             }
 
             response = requests.post(url, headers=headers, json={}, timeout=30)
-            print(f"访问知识接口:{response}")
+
             if response.status_code == 200:
                 knowledge_meta = response.json()
                 if isinstance(knowledge_meta, list):
@@ -290,8 +263,8 @@ class RulesEngineMetricCalculationAgent:
 
             # 规则引擎API配置
             method = "POST"
-            url = "http://10.192.72.11:31809/api/rules/executeKnowledge"
-            # url = "http://10.192.72.11:31809/api/rules/executeKnowledge"
+
+            url = f"{RULES_ENGINE_BASE_URL}/api/rules/executeKnowledge"
             headers = {
                 "Accept": "*/*",
                 "Accept-Encoding": "gzip, deflate, br",
@@ -342,7 +315,8 @@ class RulesEngineMetricCalculationAgent:
                     # 保存API结果到文件
                     api_results_dir = "api_results"
                     os.makedirs(api_results_dir, exist_ok=True)
-                    filename = f"{call_id}.json"
+                    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+                    filename = f"{timestamp}_{call_id}.json"
                     filepath = os.path.join(api_results_dir, filename)
 
                     try:
@@ -388,7 +362,8 @@ class RulesEngineMetricCalculationAgent:
                     # 保存API结果到文件
                     api_results_dir = "api_results"
                     os.makedirs(api_results_dir, exist_ok=True)
-                    filename = f"{call_id}.json"
+                    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+                    filename = f"{timestamp}_{call_id}.json"
                     filepath = os.path.join(api_results_dir, filename)
 
                     try:
@@ -589,7 +564,7 @@ class RulesEngineMetricCalculationAgent:
 
         # 规则引擎API配置
         method = "POST"
-        url = "http://10.192.72.11:31809/api/rules/executeKnowledge"
+        url = f"{RULES_ENGINE_BASE_URL}/api/rules/executeKnowledge"
         headers = {
             "Accept": "*/*",
             "Accept-Encoding": "gzip, deflate, br",
@@ -663,7 +638,8 @@ class RulesEngineMetricCalculationAgent:
                     # 保存API结果到文件
                     api_results_dir = "api_results"
                     os.makedirs(api_results_dir, exist_ok=True)
-                    filename = f"{call_id}.json"
+                    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+                    filename = f"{timestamp}_{call_id}.json"
                     filepath = os.path.join(api_results_dir, filename)
 
                     try:
@@ -709,7 +685,8 @@ class RulesEngineMetricCalculationAgent:
                     # 保存API结果到文件
                     api_results_dir = "api_results"
                     os.makedirs(api_results_dir, exist_ok=True)
-                    filename = f"{call_id}.json"
+                    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+                    filename = f"{timestamp}_{call_id}.json"
                     filepath = os.path.join(api_results_dir, filename)
 
                     try:

+ 74 - 101
llmops/complete_agent_flow_rule.py

@@ -31,24 +31,19 @@ import asyncio
 from typing import Dict, Any, List
 from datetime import datetime
 from langgraph.graph import StateGraph, START, END
-from langchain_core.messages import HumanMessage
 
 from workflow_state import (
     IntegratedWorkflowState,
     create_initial_integrated_state,
-    is_state_ready_for_calculation,
     get_calculation_progress,
     update_state_with_outline_generation,
     update_state_with_planning_decision,
-    finalize_state_with_report,
     convert_numpy_types,
-    MetricRequirement,
-    ReportOutline
+
 )
-from llmops.agents.outline_agent import OutlineGeneratorAgent, generate_report_outline
-from llmops.agents.planning_agent import PlanningAgent, plan_next_action, analyze_current_state
-from llmops.agents.metric_calculation_agent import MetricCalculationAgent
-from llmops.agents.rules_engine_metric_calculation_agent import RulesEngineMetricCalculationAgent
+from agents.outline_agent import  generate_report_outline
+from agents.planning_agent import  plan_next_action
+from agents.rules_engine_metric_calculation_agent import RulesEngineMetricCalculationAgent
 
 
 class CompleteAgentFlow:
@@ -65,10 +60,7 @@ class CompleteAgentFlow:
         self.api_key = api_key
         self.base_url = base_url
 
-        # 初始化各个智能体
-        self.planning_agent = PlanningAgent(api_key, base_url)
-        self.outline_agent = OutlineGeneratorAgent(api_key, base_url)
-        # self.metric_agent = MetricCalculationAgent(api_key, base_url)
+        # 初始规则引擎智能体
         self.rules_engine_agent = RulesEngineMetricCalculationAgent(api_key, base_url)
 
         # 创建工作流图
@@ -81,9 +73,7 @@ class CompleteAgentFlow:
         # 添加节点
         workflow.add_node("planning_node", self._planning_node)
         workflow.add_node("outline_generator", self._outline_generator_node)
-        workflow.add_node("metric_evaluator", self._metric_evaluator_node)
         workflow.add_node("metric_calculator", self._metric_calculator_node)
-        workflow.add_node("report_finalizer", self._report_finalizer_node)
 
         # 设置入口点
         workflow.set_entry_point("planning_node")
@@ -94,18 +84,14 @@ class CompleteAgentFlow:
             self._route_from_planning,
             {
                 "outline_generator": "outline_generator",
-                "metric_evaluator": "metric_evaluator",
                 "metric_calculator": "metric_calculator",
-                "report_finalizer": "report_finalizer",
                 END: END
             }
         )
 
         # 从各个节点返回规划节点重新决策
         workflow.add_edge("outline_generator", "planning_node")
-        workflow.add_edge("metric_evaluator", "planning_node")
-        workflow.add_edge("metric_calculator", "planning_node")
-        workflow.add_edge("report_finalizer", END)
+        workflow.add_edge("metric_calculator", END)
 
         return workflow
 
@@ -205,7 +191,7 @@ class CompleteAgentFlow:
                 industry=state["industry"],
                 sample_data=state["data_set"][:3],  # 使用前3个样本
                 api_key=self.api_key,
-                max_retries=1,  # 最多重试5次
+                max_retries=3,  # 最多重试5次
                 retry_delay=3.0  # 每次重试间隔3秒
             )
 
@@ -214,6 +200,10 @@ class CompleteAgentFlow:
 
             print(f"✅ 大纲生成完成:{outline.report_title}")
             print(f"   包含 {len(outline.sections)} 个章节,{len(outline.global_metrics)} 个指标需求")
+
+            # 分析并打印AI的指标选择推理过程
+            self._print_ai_selection_analysis(outline)
+
             return convert_numpy_types(new_state)
 
         except Exception as e:
@@ -222,6 +212,57 @@ class CompleteAgentFlow:
             new_state["errors"].append(f"大纲生成错误: {str(e)}")
             return convert_numpy_types(new_state)
 
+    def _print_ai_selection_analysis(self, outline):
+        """打印AI指标选择的推理过程分析 - 完全通用版本"""
+        print()
+        print('╔══════════════════════════════════════════════════════════════════════════════╗')
+        print('║                          🤖 AI指标选择分析                                    ║')
+        print('╚══════════════════════════════════════════════════════════════════════════════╝')
+        print()
+
+        # 计算总指标数 - outline可能是字典格式,需要适配
+        if hasattr(outline, 'sections'):
+            # Pydantic模型格式
+            total_metrics = sum(len(section.metrics_needed) for section in outline.sections)
+            sections = outline.sections
+        else:
+            # 字典格式
+            total_metrics = sum(len(section.get('metrics_needed', [])) for section in outline.get('sections', []))
+            sections = outline.get('sections', [])
+
+        # 获取可用指标总数(这里可以从状态或其他地方动态获取)
+        available_count = 26  # 这个可以从API调用中动态获取
+
+        print('📊 选择统计:')
+        print('   ┌─────────────────────────────────────────────────────────────────────┐')
+        print('   │  系统可用指标: {}个   │  AI本次选择: {}个   │  选择率: {:.1f}%     │'.format(
+            available_count, total_metrics, total_metrics/available_count*100 if available_count > 0 else 0))
+        print('   └─────────────────────────────────────────────────────────────────────┘')
+        print()
+
+        print('📋 AI决策过程:')
+        print('   大模型已根据用户需求从{}个可用指标中选择了{}个最相关的指标。'.format(available_count, total_metrics))
+        print('   选择过程完全由大模型基于语义理解和业务逻辑进行,不涉及任何硬编码规则。')
+        print()
+
+        print('🔍 选择结果:')
+        print('   • 总章节数: {}个'.format(len(sections)))
+        print('   • 平均每章节指标数: {:.1f}个'.format(total_metrics/len(sections) if sections else 0))
+        print('   • 选择策略: 基于用户需求的相关性分析')
+        print()
+
+        print('🎯 AI Agent核心能力:')
+        print('   • 语义理解: 理解用户查询的业务意图和分析需求')
+        print('   • 智能筛选: 从海量指标中挑选最相关的组合')
+        print('   • 逻辑推理: 为每个分析维度提供充分的选择依据')
+        print('   • 动态适配: 根据不同场景自动调整选择策略')
+        print()
+
+        print('💡 关键洞察:')
+        print('   AI Agent通过大模型的推理能力,实现了超越传统规则引擎的智能化指标选择,')
+        print('   能够根据具体业务场景动态调整分析框架,确保分析的针对性和有效性。')
+        print()
+
     async def _metric_evaluator_node(self, state: IntegratedWorkflowState) -> IntegratedWorkflowState:
         """指标评估节点:根据大纲确定需要计算的指标"""
         try:
@@ -281,7 +322,15 @@ class CompleteAgentFlow:
                 print("🧮 正在执行指标计算...")
 
             new_state = state.copy()
-            pending_ids = state.get("pending_metric_ids", [])
+
+            # 使用规划决策指定的指标批次,如果没有指定则使用所有待计算指标
+            current_batch = state.get("current_batch_metrics", [])
+            if current_batch:
+                pending_ids = current_batch
+                print(f"🧮 本次计算批次包含 {len(pending_ids)} 个指标")
+            else:
+                pending_ids = state.get("pending_metric_ids", [])
+                print(f"🧮 计算所有待计算指标,共 {len(pending_ids)} 个")
 
             if not pending_ids:
                 print("⚠️ 没有待计算的指标")
@@ -408,84 +457,6 @@ class CompleteAgentFlow:
             new_state["errors"].append(f"指标计算错误: {str(e)}")
             return convert_numpy_types(new_state)
 
-    async def _report_finalizer_node(self, state: IntegratedWorkflowState) -> IntegratedWorkflowState:
-        """报告完成节点:生成最终报告"""
-        try:
-            print("📋 正在生成最终报告...")
-
-            # 获取大纲和计算结果
-            outline = state.get("outline_draft")
-            computed_metrics = state.get("computed_metrics", {})
-
-            if not outline:
-                raise ValueError("没有可用的报告大纲")
-
-            # 生成最终报告
-            final_report = {
-                "title": outline.report_title,
-                "generated_at": datetime.now().isoformat(),
-                "summary": {
-                    "total_sections": len(outline.sections),
-                    "total_metrics_required": len(outline.global_metrics),
-                    "total_metrics_computed": len(computed_metrics),
-                    "planning_steps": state.get("planning_step", 0),
-                    "completion_rate": len(computed_metrics) / len(outline.global_metrics) if outline.global_metrics else 0
-                },
-                "sections": [],
-                "metrics_detail": {}
-            }
-
-            # 构建章节内容
-            for section in outline.sections:
-                section_content = {
-                    "section_id": section.section_id,
-                    "title": section.title,
-                    "description": section.description,
-                    "metrics": {}
-                }
-
-                # 添加该章节的指标数据
-                for metric_id in section.metrics_needed:
-                    if metric_id in computed_metrics:
-                        section_content["metrics"][metric_id] = computed_metrics[metric_id]
-                    else:
-                        section_content["metrics"][metric_id] = "数据缺失"
-
-                final_report["sections"].append(section_content)
-
-            # 添加详细的指标信息
-            for metric_req in outline.global_metrics:
-                metric_id = metric_req.metric_id
-                final_report["metrics_detail"][metric_id] = {
-                    "name": metric_req.metric_name,
-                    "logic": metric_req.calculation_logic,
-                    "required_fields": metric_req.required_fields,
-                    "computed": metric_id in computed_metrics,
-                    "value": computed_metrics.get(metric_id, {}).get("value", "N/A")
-                }
-
-            # 更新状态
-            new_state = finalize_state_with_report(state, final_report)
-
-            # 添加完成消息
-            new_state["messages"].append({
-                "role": "assistant",
-                "content": f"🎉 完整报告生成流程完成:{outline.report_title}",
-                "timestamp": datetime.now().isoformat()
-            })
-
-            print(f"✅ 最终报告生成完成:{outline.report_title}")
-            print(f"   章节数:{len(final_report['sections'])}")
-            print(f"   计算指标:{len(computed_metrics)}/{len(outline.global_metrics)}")
-            print(".2%")
-
-            return convert_numpy_types(new_state)
-
-        except Exception as e:
-            print(f"❌ 报告完成失败: {e}")
-            new_state = state.copy()
-            new_state["errors"].append(f"报告完成错误: {str(e)}")
-            return convert_numpy_types(new_state)
 
     def _decision_to_route(self, decision: str) -> str:
         """将规划决策转换为路由"""
@@ -622,7 +593,7 @@ async def main():
     # 测试数据
     test_data = [
         {
-           
+
         }
     ]
 
@@ -633,6 +604,8 @@ async def main():
     result = await run_complete_agent_flow(
         question="请生成一份详细的农业经营贷流水分析报告,需要包含:1.总收入和总支出统计 2.收入笔数和支出笔数 3.各类型收入支出占比分析 4.交易对手收入支出TOP3排名 5.按月份的收入支出趋势分析 6.账户数量和交易时间范围统计 7.资金流入流出月度统计等全面指标",
         industry = "农业",
+        # question="请生成一份详细的黑色金属相关经营贷流水分析报告,需要包含:1.总收入统计 2.收入笔数 3.各类型收入占比分析 4.交易对手收入排名 5.按月份的收入趋势分析 6.账户数量和交易时间范围统计 7.资金流入流出月度统计等全面指标",
+        # industry = "黑色金属",
         data=test_data,
         api_key=config.DEEPSEEK_API_KEY,
         session_id="direct-test"

+ 6 - 0
llmops/config.py

@@ -38,6 +38,12 @@ DEEPSEEK_API_KEY = os.getenv("DEEPSEEK_API_KEY")
 DEEPSEEK_BASE_URL = os.getenv("DEEPSEEK_BASE_URL", "https://api.deepseek.com")
 
 # ============================================================================
+# 规则引擎 API 配置
+# ============================================================================
+# 规则引擎基础URL配置 - 用于指标计算和知识库访问
+RULES_ENGINE_BASE_URL = os.getenv("RULES_ENGINE_BASE_URL", "http://localhost:8081")
+
+# ============================================================================
 # 项目路径配置
 # ============================================================================
 # 定义项目中各个功能模块的数据存储路径