Переглянути джерело

Merge branch 'master' of http://git.yangzhiqiang.tech/jiaqiang/tx_flow_analysis

jiaqiang 1 місяць тому
батько
коміт
62b027fef7
2 змінених файлів з 469 додано та 14 видалено
  1. 115 14
      llmops/agents/data_stardard.py
  2. 354 0
      llmops/folder_scanner.py

+ 115 - 14
llmops/agents/data_stardard.py

@@ -1,10 +1,12 @@
 import os
-import json
+import time
 import asyncio
 import io
 import csv
 import datetime
 import httpx
+import json
+import uuid
 
 # --- LangChain Imports ---
 from langchain_openai import ChatOpenAI
@@ -23,9 +25,6 @@ class TransactionParserAgent:
             base_url=base_url,
             temperature=0.1,
             max_retries=3,  # LangChain 内置重试机制
-            model_kwargs={
-                "response_format": {"type": "json_object"}  # 强制 JSON 模式
-            },
             # 配置 httpx 客户端以优化超时和连接 (LangChain 允许透传 http_client)
             http_client=httpx.Client(
                 timeout=httpx.Timeout(300.0, read=300.0, connect=60.0),
@@ -37,29 +36,40 @@ class TransactionParserAgent:
         # 定义 JSON 解析器
         self.parser = JsonOutputParser()
 
+        # 初始化API调用跟踪
+        self.api_calls = []
+
     async def _invoke_miner_u(self, file_path: str) -> str:
         """调用 MinerU 并提取纯行数据 (保持 httpx 调用不变,因为这不是 LLM)"""
-        print(f"🚀 MinerU 解析中: {os.path.basename(file_path)}")
+        miner_start_time = time.perf_counter()
+        print("\n" + "=" * 40)
+        print("📌 【步骤1 - 数据提取】 开始执行")
+        dealRows = 0
         try:
             # MinerU 是独立服务,继续使用原生 httpx
             async with httpx.AsyncClient() as client:
                 with open(file_path, 'rb') as f:
                     files = {'file': (os.path.basename(file_path), f)}
                     data = {'folderId': 'text'}
+                    print("🔄数据提取中...")
                     response = await client.post(self.multimodal_api_url, files=files, data=data, timeout=120.0)
-
                 if response.status_code == 200:
                     res_json = response.json()
                     full_md_list = []
                     for element in res_json.get('convert_json', []):
                         if 'md' in element:
                             full_md_list.append(element['md'])
+                        if 'rows' in element:
+                            dealRows+=len(element['rows'])
+                    print(f"📊 提取结果:共提取 {dealRows-1} 条数据")
                     return "\n\n".join(full_md_list)
                 return ""
         except Exception as e:
             print(f"❌ MinerU 调用异常: {e}")
             return ""
-
+        finally:
+            print(f"✅ 【步骤1 - 数据提取】 执行完成")
+            print(f"⏱️  执行耗时:{ time.perf_counter() - miner_start_time:.2f} 秒")
     def _get_csv_prompt_template(self) -> ChatPromptTemplate:
         """
         构造 LangChain 的 Prompt 模板
@@ -90,6 +100,12 @@ class TransactionParserAgent:
   1. 严格返回一个包含对象的 JSON 数组。
   2. 每个对象必须包含上述 8 个字段名作为 Key。
   3. 不要输出任何解释文字或 Markdown 代码块标签。
+  
+# Anti-Hallucination Rules
+- 不得根据上下文推断任何未在原始数据中明确出现的字段
+- 不得计算或猜测余额
+- 不得根据常识补全对手方名称
+- 若字段缺失,必须返回空字符串 ""  
 """
         user_template = """# Input Data
 {chunk_data}
@@ -107,10 +123,12 @@ JSON Array:
         md_text = await self._invoke_miner_u(file_path)
         if not md_text:
             return ""
-
+        # 记录开始时间(使用time.perf_counter获取高精度时间)
+        switch_start_time = time.perf_counter()
+        print("\n" + "=" * 40)
+        print("📌 【步骤2 - 标准化转换】 开始执行")
         # 初步切分
         raw_lines = md_text.splitlines()
-
         # 提取真正的第一行作为基准表头
         clean_lines = [l.strip() for l in raw_lines if l.strip()]
         if len(clean_lines) < 2: return ""
@@ -154,17 +172,88 @@ JSON Array:
             chunk = data_rows[i: i + batch_size]
             context_chunk = [table_header] + chunk
             chunk_str = "\n".join(context_chunk)
-
-            print(f"🔄 正在转换批次 {i // batch_size + 1},包含 {len(chunk)} 条数据...")
+            # 1. 记录开始时间(使用time.perf_counter获取高精度时间)
+            start_time = time.perf_counter()
+            print(f"🔄 正在通过LLM转换批次 {i // batch_size + 1},包含 {len(chunk)} 条数据...")
             # print(f"待转换的数据块:\n{chunk_str}")
             try:
                 # --- LangChain 调用 ---
                 # 使用 ainvoke 异步调用链
+                # 记录API调用开始时间
+                call_start_time = datetime.datetime.now()
+
                 data_data = await chain.ainvoke({
                     "start_id": global_tx_counter,
                     "chunk_data": chunk_str
                 })
 
+                # 记录API调用结束时间
+                call_end_time = datetime.datetime.now()
+
+                # 记录API调用结果 - 简化版:只保存提示词和结果数据
+                call_id = f"api_llm_数据转换_{'{:.2f}'.format((call_end_time - call_start_time).total_seconds())}"
+
+                # 从chain中提取提示词(如果可能)
+                prompt_content = ""
+                try:
+                    # 尝试从chain获取最后的消息内容
+                    if hasattr(chain, 'get_prompts'):
+                        prompts = chain.get_prompts()
+                        if prompts:
+                            prompt_content = str(prompts[-1])
+                    else:
+                        # 如果无法获取,构造基本的提示词信息
+                        prompt_content = f"转换批次数据,start_id: {global_tx_counter}, chunk_data: {chunk_str[:200]}..."
+                except:
+                    prompt_content = f"转换批次数据,start_id: {global_tx_counter}, chunk_data: {chunk_str[:200]}..."
+
+                api_call_info = {
+                    "call_id": call_id,
+                    "start_time": call_start_time.isoformat(),
+                    "end_time": call_end_time.isoformat(),
+                    "duration": (call_end_time - call_start_time).total_seconds(),
+                    "prompt": prompt_content,
+                    "input_params": {
+                        "start_id": global_tx_counter,
+                        "chunk_data": chunk_str
+                    },
+                    "llm_result": data_data
+                }
+                self.api_calls.append(api_call_info)
+
+                # 保存API结果到文件 (Markdown格式,更易阅读)
+                # 使用运行ID创建独立的文件夹
+                run_id = os.environ.get('FLOW_RUN_ID', 'default')
+                api_results_dir = f"api_results_{run_id}"
+                os.makedirs(api_results_dir, exist_ok=True)
+                timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
+                filename = f"{timestamp}_{call_id}.md"
+                filepath = os.path.join(api_results_dir, filename)
+
+                try:
+                    with open(filepath, 'w', encoding='utf-8') as f:
+                        f.write("# 数据转换结果\n\n")
+                        f.write("## 调用信息\n\n")
+                        f.write(f"- 调用ID: {call_id}\n")
+                        f.write(f"- 开始时间: {call_start_time.isoformat()}\n")
+                        f.write(f"- 结束时间: {call_end_time.isoformat()}\n")
+                        f.write(f"- 执行时长: {(call_end_time - call_start_time).total_seconds():.2f} 秒\n")
+                        f.write("\n## 提示词入参\n\n")
+                        f.write("```\n")
+                        f.write(api_call_info["prompt"])
+                        f.write("\n```\n\n")
+                        f.write("## 输入参数\n\n")
+                        f.write("```json\n")
+                        f.write(json.dumps(api_call_info["input_params"], ensure_ascii=False, indent=2))
+                        f.write("\n```\n\n")
+                        f.write("## LLM返回结果\n\n")
+                        f.write("```json\n")
+                        f.write(json.dumps(api_call_info["llm_result"], ensure_ascii=False, indent=2))
+                        f.write("\n```\n")
+                    print(f"[API_RESULT] 保存API结果文件: {filepath}")
+                except Exception as e:
+                    print(f"[ERROR] 保存API结果文件失败: {filepath}, 错误: {str(e)}")
+
                 # print(f"💡 LLM 返回数据: {data_data}")
 
                 # 兼容处理:LangChain Parser 通常会直接返回 List 或 Dict
@@ -206,7 +295,12 @@ JSON Array:
 
             except Exception as e:
                 print(f"⚠️ 批次执行失败: {e}")
-
+            finally:
+                end_time = time.perf_counter()
+                elapsed_time = end_time - start_time
+                print(f"⏱️  执行耗时: {elapsed_time:.2f} 秒")
+        print(f"📊 转换结果:共转换 {global_tx_counter - 1} 条数据")
+        print(f"✅ 【步骤2 - 标准化转换】 执行完成")
         return csv_content
 
     async def parse_and_save_to_file(self, file_path: str, output_dir: str = "output") -> str:
@@ -237,11 +331,15 @@ JSON Array:
         """
         标准 Workflow 入口方法
         """
+        # 1. 记录开始时间(使用time.perf_counter获取高精度时间)
+        start_time = time.perf_counter()
+        print(f"BEGIN---数据标准化任务开始---")
         try:
             print(f"待执行标准化的文件:{input_file_path}")
             api_results_dir = "data_files"
             saved_path = await self.parse_and_save_to_file(input_file_path, api_results_dir)
 
+
             return {
                 "status": "success",
                 "file_path": saved_path,
@@ -253,7 +351,11 @@ JSON Array:
                 "status": "error",
                 "message": str(e)
             }
-
+        finally:
+            end_time = time.perf_counter()
+            elapsed_time = end_time - start_time
+            print(f"⏱️ 执行总耗时: {elapsed_time:.2f} 秒")
+            print(f"END---数据标准化任务结束")
 
 # --- 运行 ---
 async def main():
@@ -276,7 +378,6 @@ async def main():
 
     if result["status"] == "success":
         print(f"🎯 【数据标准化】任务完成!")
-        print(f"📂 标准化后文件输出位置: {result['file_path']}")
     else:
         print(f"❌ 任务失败: {result['message']}")
 

+ 354 - 0
llmops/folder_scanner.py

@@ -0,0 +1,354 @@
+import os
+import logging
+from typing import Dict, List, Optional
+from dataclasses import dataclass
+
+# 配置日志
+logging.basicConfig(
+    level=logging.INFO,
+    format='%(asctime)s - %(levelname)s - %(message)s'
+)
+logger = logging.getLogger(__name__)
+
+
+@dataclass
+class ScanConfig:
+    """扫描配置"""
+    # 决策agent的期望数量,-1表示不校验
+    expected_decision_count: int = -1
+    # 大纲生成agent的期望数量,-1表示不校验
+    expected_outline_count: int = -1
+    # 指标计算agent的期望数量,-1表示不校验
+    expected_metric_count: int = -1
+
+
+@dataclass
+class ScanResult:
+    """单个文件夹扫描结果"""
+    folder_name: str
+    decision_files: List[str]
+    outline_files: List[str]
+    metric_files: List[str]
+    success_metric_files: List[str]
+
+    # 实际数量
+    actual_decision_count: int
+    actual_outline_count: int
+    actual_metric_count: int
+    actual_success_metric_count: int
+
+    # 准确率(如果配置了期望值)
+    decision_accuracy: Optional[float] = None
+    outline_accuracy: Optional[float] = None
+    metric_accuracy: Optional[float] = None
+
+
+class FolderScanner:
+    """文件夹扫描工具类"""
+
+    def __init__(self, base_path: str = "."):
+        """
+        初始化扫描器
+
+        Args:
+            base_path: 基础路径,默认为当前目录
+        """
+        self.base_path = base_path
+        self.results: List[ScanResult] = []
+
+    def scan_folders(self, folder_count: int, config: Optional[ScanConfig] = None) -> Dict:
+        """
+        扫描指定数量的文件夹
+
+        Args:
+            folder_count: 要扫描的文件夹数量
+            config: 扫描配置,包含各agent的期望数量
+
+        Returns:
+            扫描统计结果
+        """
+        if config is None:
+            config = ScanConfig()
+
+        logger.info(f"开始扫描 {folder_count} 个文件夹...")
+        logger.info(f"期望配置: 决策agent={config.expected_decision_count}, "
+                    f"大纲agent={config.expected_outline_count}, "
+                    f"指标agent={config.expected_metric_count}")
+
+        self.results = []
+
+        for i in range(1, folder_count + 1):
+            folder_name = f"api_results_{i}"
+            folder_path = os.path.join(self.base_path, folder_name)
+
+            if not os.path.exists(folder_path):
+                logger.warning(f"文件夹不存在: {folder_path}")
+                continue
+
+            result = self._scan_single_folder(folder_name, folder_path, config)
+            self.results.append(result)
+
+        return self._generate_report(config)
+
+    def _calculate_accuracy(self, actual: int, expected: int) -> float:
+        """计算准确率"""
+        if actual == expected:
+            return 100.0
+        elif actual < expected:
+            # 实际少于期望
+            return (actual / expected) * 100
+        else:
+            # 实际多于期望,也应该计算偏差
+            # 比如期望3个,实际4个,准确率 = 3/4 * 100 = 75%
+            return (expected / actual) * 100
+
+    def _scan_single_folder(self, folder_name: str, folder_path: str, config: ScanConfig) -> ScanResult:
+        """扫描单个文件夹"""
+        all_files = os.listdir(folder_path)
+
+        # 分类文件
+        decision_files = []
+        outline_files = []
+        metric_files = []
+        success_metric_files = []
+
+        for file in all_files:
+            if file.endswith('.md'):
+                if '规划决策' in file:
+                    decision_files.append(file)
+                elif '大纲生成' in file:
+                    outline_files.append(file)
+            elif file.endswith('.json'):
+                metric_files.append(file)
+                if '_success.json' in file:
+                    success_metric_files.append(file)
+
+        # 计算实际数量
+        actual_decision_count = len(decision_files)
+        actual_outline_count = len(outline_files)
+        actual_metric_count = len(metric_files)
+        actual_success_metric_count = len(success_metric_files)
+
+        # 计算准确率(如果配置了期望值)
+        decision_accuracy = None
+        outline_accuracy = None
+        metric_accuracy = None
+
+        if config.expected_decision_count != -1 and config.expected_decision_count > 0:
+            decision_accuracy = self._calculate_accuracy(actual_decision_count, config.expected_decision_count)
+
+        if config.expected_outline_count != -1 and config.expected_outline_count > 0:
+            outline_accuracy = self._calculate_accuracy(actual_outline_count, config.expected_outline_count)
+
+        if config.expected_metric_count != -1 and config.expected_metric_count > 0:
+            metric_accuracy = self._calculate_accuracy(actual_metric_count, config.expected_metric_count)
+
+        return ScanResult(
+            folder_name=folder_name,
+            decision_files=decision_files,
+            outline_files=outline_files,
+            metric_files=metric_files,
+            success_metric_files=success_metric_files,
+            actual_decision_count=actual_decision_count,
+            actual_outline_count=actual_outline_count,
+            actual_metric_count=actual_metric_count,
+            actual_success_metric_count=actual_success_metric_count,
+            decision_accuracy=decision_accuracy,
+            outline_accuracy=outline_accuracy,
+            metric_accuracy=metric_accuracy
+        )
+
+    def _generate_report(self, config: ScanConfig) -> Dict:
+        """生成报告"""
+        total_folders = len(self.results)
+
+        # 计算平均准确率
+        avg_decision_accuracy = 0
+        avg_outline_accuracy = 0
+        avg_metric_accuracy = 0
+
+        if total_folders > 0:
+            # 决策agent平均准确率
+            decision_accuracies = [r.decision_accuracy for r in self.results if r.decision_accuracy is not None]
+            if decision_accuracies:
+                avg_decision_accuracy = sum(decision_accuracies) / len(decision_accuracies)
+
+            # 大纲agent平均准确率
+            outline_accuracies = [r.outline_accuracy for r in self.results if r.outline_accuracy is not None]
+            if outline_accuracies:
+                avg_outline_accuracy = sum(outline_accuracies) / len(outline_accuracies)
+
+            # 指标agent平均准确率
+            metric_accuracies = [r.metric_accuracy for r in self.results if r.metric_accuracy is not None]
+            if metric_accuracies:
+                avg_metric_accuracy = sum(metric_accuracies) / len(metric_accuracies)
+
+        # 计算综合成功率(基于所有配置的校验项)
+        comprehensive_success_rate = 0
+        success_items = []
+
+        # 添加所有配置的准确率
+        if config.expected_decision_count != -1:
+            success_items.append(avg_decision_accuracy)
+        if config.expected_outline_count != -1:
+            success_items.append(avg_outline_accuracy)
+        if config.expected_metric_count != -1:
+            success_items.append(avg_metric_accuracy)
+
+        if success_items:
+            comprehensive_success_rate = sum(success_items) / len(success_items)
+
+        # 统计成功指标文件总数
+        total_success_metric = sum(r.actual_success_metric_count for r in self.results)
+        total_metric_files = sum(r.actual_metric_count for r in self.results)
+
+        report = {
+            "total_folders_scanned": total_folders,
+            "accuracy_statistics": {
+                "comprehensive_success_rate": f"{comprehensive_success_rate:.2f}%",  # 综合成功率
+                "decision_accuracy": f"{avg_decision_accuracy:.2f}%" if config.expected_decision_count != -1 else "未配置",
+                "outline_accuracy": f"{avg_outline_accuracy:.2f}%" if config.expected_outline_count != -1 else "未配置",
+                "metric_accuracy": f"{avg_metric_accuracy:.2f}%" if config.expected_metric_count != -1 else "未配置",
+            },
+            "metric_statistics": {
+                "total_metric_files": total_metric_files,
+                "total_success_metric_files": total_success_metric,
+                "success_ratio": f"{(total_success_metric / total_metric_files * 100):.2f}%" if total_metric_files > 0 else "0.00%"
+            },
+            "expected_counts": {
+                "decision": config.expected_decision_count,
+                "outline": config.expected_outline_count,
+                "metric": config.expected_metric_count
+            },
+            "details": []
+        }
+
+        # 添加详细信息
+        for result in self.results:
+            detail = {
+                "folder": result.folder_name,
+                "actual_counts": {
+                    "decision": result.actual_decision_count,
+                    "outline": result.actual_outline_count,
+                    "metric": result.actual_metric_count,
+                    "success_metric": result.actual_success_metric_count
+                },
+                "accuracies": {}
+            }
+
+            # 只添加有准确率的项
+            if result.decision_accuracy is not None:
+                detail["accuracies"]["decision"] = f"{result.decision_accuracy:.2f}%"
+            if result.outline_accuracy is not None:
+                detail["accuracies"]["outline"] = f"{result.outline_accuracy:.2f}%"
+            if result.metric_accuracy is not None:
+                detail["accuracies"]["metric"] = f"{result.metric_accuracy:.2f}%"
+
+            report["details"].append(detail)
+
+        return report
+
+    def print_report(self, report: Dict):
+        """打印报告"""
+        logger.info("=" * 50)
+        logger.info("文件夹扫描统计报告")
+        logger.info("=" * 50)
+        logger.info(f"扫描文件夹总数: {report['total_folders_scanned']}")
+
+        # 准确率统计
+        logger.info("\n📊 准确率统计:")
+        acc_stats = report['accuracy_statistics']
+
+        # 显示综合成功率
+        logger.info(f"  综合成功率: {acc_stats['comprehensive_success_rate']}")
+
+        # 只显示配置了的准确率
+        if acc_stats['decision_accuracy'] != "未配置":
+            logger.info(f"  决策agent准确率: {acc_stats['decision_accuracy']}")
+        if acc_stats['outline_accuracy'] != "未配置":
+            logger.info(f"  大纲agent准确率: {acc_stats['outline_accuracy']}")
+        if acc_stats['metric_accuracy'] != "未配置":
+            logger.info(f"  指标agent准确率: {acc_stats['metric_accuracy']}")
+
+        # 指标文件统计
+        logger.info("\n📈 指标文件统计:")
+        metric_stats = report['metric_statistics']
+        logger.info(f"  指标文件总数: {metric_stats['total_metric_files']}")
+        logger.info(f"  成功指标文件数: {metric_stats['total_success_metric_files']}")
+        logger.info(f"  指标成功率: {metric_stats['success_ratio']}")
+
+        # 期望值显示
+        logger.info("\n⚙️ 配置期望值:")
+        expected = report['expected_counts']
+        if expected['decision'] != -1:
+            logger.info(f"  决策agent: {expected['decision']}个")
+        if expected['outline'] != -1:
+            logger.info(f"  大纲agent: {expected['outline']}个")
+        if expected['metric'] != -1:
+            logger.info(f"  指标agent: {expected['metric']}个")
+
+        logger.info("=" * 50)
+
+        # 打印前3个文件夹的详情(避免日志太长)
+        if report['details']:
+            logger.info("前3个文件夹详情:")
+            for i, detail in enumerate(report['details'][:3]):
+                logger.info(f"\n  📂 {detail['folder']}:")
+                actual = detail['actual_counts']
+
+                # 构建实际数量字符串
+                count_parts = []
+                if expected['decision'] != -1:
+                    count_parts.append(f"决策:{actual['decision']}")
+                if expected['outline'] != -1:
+                    count_parts.append(f"大纲:{actual['outline']}")
+                if expected['metric'] != -1:
+                    count_parts.append(f"指标:{actual['metric']}")
+                count_parts.append(f"成功指标:{actual['success_metric']}")
+
+                logger.info(f"    实际数量 - {', '.join(count_parts)}")
+
+                # 构建准确率字符串
+                if detail['accuracies']:
+                    accuracy_parts = []
+                    for key, value in detail['accuracies'].items():
+                        if key == 'decision':
+                            accuracy_parts.append(f"决策:{value}")
+                        elif key == 'outline':
+                            accuracy_parts.append(f"大纲:{value}")
+                        elif key == 'metric':
+                            accuracy_parts.append(f"指标:{value}")
+
+                    logger.info(f"    准确率 - {', '.join(accuracy_parts)}")
+
+            if len(report['details']) > 3:
+                logger.info(f"  ... 还有{len(report['details']) - 3}个文件夹未显示")
+
+        logger.info("=" * 50)
+
+
+# 使用示例
+def main():
+    """使用示例"""
+    # 创建扫描器
+    scanner = FolderScanner(base_path=".")
+
+    # 配置期望值
+    config = ScanConfig(
+        expected_decision_count=-1,  # 期望每个文件夹有3个决策agent文件
+        expected_outline_count=1,  # 期望每个文件夹有1个大纲生成agent文件
+        expected_metric_count=20  # 期望每个文件夹有20个指标计算agent文件
+    )
+
+    # 扫描100个文件夹
+    report = scanner.scan_folders(folder_count=2, config=config)
+
+    # 打印报告
+    scanner.print_report(report)
+
+    # 也可以返回报告数据供进一步处理
+    return report
+
+
+if __name__ == "__main__":
+    main()