فهرست منبع

数据标准化v2.0

chaixuhong 1 ماه پیش
والد
کامیت
455ef10929
1فایلهای تغییر یافته به همراه251 افزوده شده و 362 حذف شده
  1. 251 362
      llmops/agents/data_stardard.py

+ 251 - 362
llmops/agents/data_stardard.py

@@ -6,17 +6,18 @@ import csv
 import datetime
 import httpx
 import json
-import uuid
+import sqlite3
+import re
 
 # --- LangChain Imports ---
 from langchain_openai import ChatOpenAI
 from langchain_core.prompts import ChatPromptTemplate
 from langchain_core.output_parsers import JsonOutputParser
 from langchain_core.outputs import Generation
-import re
 
-class SafeJsonOutputParser(JsonOutputParser):
 
+# --- 保持工具类不变 ---
+class SafeJsonOutputParser(JsonOutputParser):
     def parse_result(self, result, *, partial: bool = False):
         if isinstance(result, list) and len(result) > 0:
             generation = result[0]
@@ -25,147 +26,31 @@ class SafeJsonOutputParser(JsonOutputParser):
         else:
             raise ValueError(f"Unexpected result type: {type(result)}")
         text = generation.text
-        # 1️⃣ 去 <think>...</think>
         text = re.sub(r"<think>.*?</think>", "", text, flags=re.S).strip()
-
-        # 2️⃣ 去 ```json ``` 包裹
         text = re.sub(r"^```(?:json)?|```$", "", text, flags=re.I | re.M).strip()
-
-        # 3️⃣ ⭐ 只截取 JSON 本体
         match = re.search(r"(\[\s*{.*}\s*\]|\{\s*\".*\"\s*\})", text, flags=re.S)
         if not match:
-            raise ValueError(f"Invalid json output after clean: {text[:200]}")
-
+            # 兼容:有时候 LLM 可能直接返回 SQL 字符串而不是 JSON,这里做个简单的容错
+            if "SELECT" in text.upper():
+                return {"sql": text}
+            raise ValueError(f"Invalid json output: {text[:200]}")
         json_text = match.group(1)
         return json.loads(json_text)
 
 
-# --- 核心 Parser ---
 class TransactionParserAgent:
     def __init__(self, api_key: str, multimodal_api_url: str, base_url: str = "https://api.deepseek.com"):
-        # 1. 初始化 LangChain ChatOpenAI 客户端
-        # DeepSeek 完全兼容 OpenAI 接口,使用 ChatOpenAI 是标准做法
         self.llm = ChatOpenAI(
             model="deepseek-chat",
             api_key=api_key,
             base_url=base_url,
-            temperature=0.1,
-            max_retries=3,  # LangChain 内置重试机制
-            # 配置 httpx 客户端以优化超时和连接 (LangChain 允许透传 http_client)
-            http_client=httpx.Client(
-                timeout=httpx.Timeout(300.0, read=300.0, connect=300.0),
-                limits=httpx.Limits(max_keepalive_connections=5, max_connections=10)
-            )
+            temperature=0.0,  # SQL生成需要极其精确
+            max_retries=3,
+            http_client=httpx.Client(timeout=60.0)
         )
         self.multimodal_api_url = multimodal_api_url
-
-        # 定义 JSON 解析器
         self.parser = SafeJsonOutputParser()
 
-        # 初始化API调用跟踪
-        self.api_calls = []
-
-
-
-    def _validate_and_reconcile(self, parsed_data: list) -> list:
-        """
-        金额校验与余额矫正逻辑:
-        1. 正序排列
-        2. 寻找锚点 (符合前后余额勾稽关系)
-        3. 双向推演
-        4. 熔断机制 (40%)
-        """
-        if len(parsed_data) < 3:
-            return parsed_data
-        sorted_data = [];
-        # 按照日期和时间正序排列(最早的在第一条)
-        # 假设 txDate 为 YYYY-MM-DD, txTime 为 HH:mm:ss
-
-        def get_sort_key(x):
-            # 处理时间:空值用最大时间兜底
-            date = x.get('txDate', '9999-12-31')
-            time = x.get('txTime', '23:59:59')
-            # 处理txId:提取数字,异常则返回极大值(倒序时排最后)
-            try:
-                txid_num = int(''.join([c for c in x.get('txId', '') if c.isdigit()]))
-            except:
-                txid_num = float('inf')
-            return (date, time, -txid_num)
-
-        if parsed_data[0]["txDate"] > parsed_data[len(parsed_data)-1]["txDate"]:
-            sorted_data = sorted(parsed_data, key=get_sort_key)
-        def to_float(s):
-            try:
-                return round(float(s), 2)
-            except:
-                return 0.0
-
-        anchor_idx = -1
-        # 1. 寻找肯定正确的数据行(锚点)
-        # 逻辑:
-        # 1. 当前行余额  = 上一行余额 +- 当前行金额
-        #    当前行余额  = 下一行余额 -+ 下一行金额
-        for i in range(1, len(sorted_data) - 1):
-            p = sorted_data[i - 1]
-            c = sorted_data[i]
-            n = sorted_data[i + 1]
-
-            # 当前行余额
-            c_bal = to_float(c['txBalance'])
-
-            # 计算上一笔推导当前笔
-            p_bal = to_float(p['txBalance'])
-            c_amt = to_float(c['txAmount'])
-            calc_c_bal = round(p_bal + c_amt, 2) if c['txDirection'] == "收入" else round(p_bal - c_amt, 2)
-
-            # 计算当前笔推导下一笔
-            n_amt = to_float(n['txAmount'])
-            n_bal = to_float(n['txBalance'])
-            calc_n_bal = round(n_bal - n_amt, 2) if n['txDirection'] == "收入" else round(n_bal + n_amt, 2)
-            if c_bal == calc_c_bal and c_bal == calc_n_bal:
-                anchor_idx = i
-                break
-
-        if anchor_idx == -1:
-            print("⚠️ 无法找到勾稽关系吻合的锚点,判定为不连续流水,跳过矫正。")
-            return parsed_data
-
-        print(f"数据锚点index={anchor_idx}")
-        # 2. 从锚点开始向上下推演修正
-        new_data = [item.copy() for item in sorted_data]
-        fix_count = 0
-        print(sorted_data)
-        # 往下推 (未来)  当前行余额  = 下一行余额 -+ 下一行金额
-        for i in range(anchor_idx + 1, len(new_data)-1):
-            curr_bal = to_float(new_data[i]['txBalance'])
-            next_bal = to_float(new_data[i + 1]['txBalance'])
-            next_amt = to_float(new_data[i + 1]['txAmount'])
-            expected_bal = round(next_bal - next_amt, 2) if new_data[i+1]['txDirection'] == "收入" else round(next_bal + next_amt, 2)
-            if abs(to_float(curr_bal) - expected_bal) > 0.01:
-                print(f"【往下推】行{new_data[i]['txId']}余额为:{curr_bal},计算余额为:{expected_bal}")
-                new_data[i]['txBalance'] = str(expected_bal)
-                fix_count += 1
-
-        # 往上推 (过去) 当前行余额  = 上一行余额 +- 当前行金额
-        for i in range(anchor_idx - 1, 0, -1):  # 关键:终止条件改为0,避免i=0
-            curr_bal = to_float(new_data[i]['txBalance'])
-            pre_bal = to_float(new_data[i - 1]['txBalance'])
-            curr_amt = to_float(new_data[i]['txAmount'])
-            expected_bal = round(pre_bal + curr_amt, 2) if new_data[i]['txDirection'] == "收入" else round(pre_bal - curr_amt, 2)
-            if abs(to_float(curr_bal) - expected_bal) > 0.01:
-                print(f"【往上推】行{new_data[i]['txId']}余额为:{curr_bal},计算余额为:{expected_bal}")
-                new_data[i]['txBalance'] = str(expected_bal)
-                fix_count += 1
-
-        # 3. 熔断判定
-        fix_ratio = fix_count / len(new_data)
-        if fix_ratio > 0.4:
-            print(f"⚠️ 修正比例 {fix_ratio:.2%} 超过 40%,怀疑数据非连续,放弃修正。")
-            return parsed_data
-
-        print(f"✅ 余额勾稽校验通过,自动矫正了 {fix_count} 条数据偏差。")
-        return new_data
-
     async def _invoke_miner_u(self, file_path: str) -> str:
         """调用 MinerU 并提取纯行数据 (保持 httpx 调用不变,因为这不是 LLM)"""
         miner_start_time = time.perf_counter()
@@ -187,8 +72,8 @@ class TransactionParserAgent:
                         if 'md' in element:
                             full_md_list.append(element['md'])
                         if 'rows' in element:
-                            dealRows+=len(element['rows'])
-                    print(f"📊 提取结果:共提取 {dealRows-1} 条数据")
+                            dealRows += len(element['rows'])
+                    print(f"📊 提取结果:共提取 {dealRows - 1} 条数据")
                     return "\n\n".join(full_md_list)
                 return ""
         except Exception as e:
@@ -196,251 +81,263 @@ class TransactionParserAgent:
             return ""
         finally:
             print(f"✅ 【步骤1 - 数据提取】 执行完成")
-            print(f"⏱️  执行耗时:{ time.perf_counter() - miner_start_time:.2f} 秒")
-    def _get_csv_prompt_template(self) -> ChatPromptTemplate:
+            print(f"⏱️  执行耗时:{time.perf_counter() - miner_start_time:.2f} 秒")
+
+    # --- 🆕 核心逻辑:SQLite 转换引擎 ---
+    def _init_sqlite_db(self, data_rows: list, header_line: str, delimiter='|') -> tuple:
         """
-        构造 LangChain 的 Prompt 模板
+        将 Markdown 行数据灌入 SQLite 内存数据库的通用宽表
+        返回: (conn, header_mapping_info)
         """
+        # 1. 创建内存数据库
+        conn = sqlite3.connect(":memory:")
+        cursor = conn.cursor()
+
+        header_fingerprint = "".join(header_line.strip().strip('|').split())
+        header_added = False  # 确保数据库里只进一个表头
+
+        # 2. 分析最大列数,建立通用宽表 (row_id, c0, c1, ... c30)
+        max_cols = 0
+        parsed_rows = []
+        # 预处理:清洗 Markdown 分隔符
+        for row in data_rows:
+            # 去除首尾的 |
+            clean_row = row.strip().strip('|')
+
+            # A. 过滤掉纯分割线(如 | --- | --- |)
+            if not re.search(r'[\u4e00-\u9fa5a-zA-Z0-9]', clean_row):
+                continue
+
+            # B. 提取当前行的指纹
+            current_fingerprint = "".join(clean_row.split())
+
+            # C. 核心判断:
+            if current_fingerprint == header_fingerprint:
+                if not header_added:
+                    # 只有第一次见到表头指纹时,才放入数据库
+                    header_added = True
+                else:
+                    # 之后再见到一模一样的表头,直接跳过
+                    continue
+
+            # 分割
+            parts = [p.strip() for p in clean_row.split(delimiter)]
+            if len(parts) > max_cols:
+                max_cols = len(parts)
+            parsed_rows.append(parts)
+
+        if max_cols == 0:
+            return None, None
+
+        # 动态建表语句
+        cols_def = ", ".join([f"c{i} TEXT" for i in range(max_cols)])
+        create_sql = f"CREATE TABLE temp_raw_data (row_id INTEGER PRIMARY KEY AUTOINCREMENT, {cols_def});"
+        cursor.execute(create_sql)
+
+        # 3. 批量插入数据
+        insert_sql = f"INSERT INTO temp_raw_data ({', '.join([f'c{i}' for i in range(max_cols)])}) VALUES ({', '.join(['?' for _ in range(max_cols)])})"
+
+        # 补全数据(如果某行比最长行短,补None)
+        final_data = []
+        for p in parsed_rows:
+            padding = [None] * (max_cols - len(p))
+            final_data.append(p + padding)
+        cursor.executemany(insert_sql, final_data)
+        conn.commit()
+
+        return conn, max_cols
+
+    def _get_sql_generation_prompt(self) -> ChatPromptTemplate:
         system_template = """
 # Role
-你是一个高精度的银行账单转换工具。
+你是一个 SQLite 专家
 
 # Task
-将输入的 Markdown 表格行转换为 JSON 数组。
-
-# Field Rules
-1. txId: 如果输入数据中有交易流水号则直接使用,如果没有,从 T{start_id:04d} 开始递增生成。
-2. txDate: 交易日期,格式为YYYY-MM-DD
-3. txTime: 交易时间,格式为HH:mm:ss (未知填 00:00:00)
-4. txAmount: 交易金额,绝对值数字
-5. txBalance: 交易后余额。浮点数,移除千分位逗号。
-6. txDirection: 交易方向。必须根据以下逻辑判断只输出“收入”或“支出”:
-   - 若有“借/贷”列:“借”通常为支出,“贷”通常为收入(除非是信用卡,需结合表头)。
-   - 若有“收入/支出”分列:按列归类。
-   - 若金额带正负号:"+"为收入,"-"为支出。
-   - 如果无符号,请结合表头判断。
-7. txSummary: 摘要、用途、业务类型等备注。
-8. txCounterparty: 交易对手方(名称及账号,如有)。
-
-# Constraints
-- **强制输出格式**:
-  1. 严格返回一个包含对象的 JSON 数组。
-  2. 每个对象必须包含上述 8 个字段名作为 Key。
-  3. 不要输出任何解释文字或 Markdown 代码块标签。
-  
-# Anti-Hallucination Rules
-- 不得根据上下文推断任何未在原始数据中明确出现的字段
-- 不得计算或猜测余额
-- 不得根据常识补全对手方名称
-- 若字段缺失,必须返回空字符串 ""  
-"""
-        user_template = """# Input Data
-{chunk_data}
-
-# Output
-JSON Array:
-"""
-        return ChatPromptTemplate.from_messages([
-            ("system", system_template),
-            ("user", user_template)
-        ])
+你有一个名为 `temp_raw_data` 的表,里面存储了 OCR 识别后的原始数据。
+表的列名为 `c0`, `c1`, `c2`... `cN`。
+请根据提供的【表头】和【数据样本】,编写一条 SQL 查询语句,将原始列映射为标准输出字段。
+
+# Target Schema (Output Columns)
+你的 SQL 必须 `SELECT` 出以下字段(顺序不能变):
+1. `txId`: 交易流水号。如果原始数据没有,使用 `row_id`。
+2. `txDate`: 交易日期 (格式 YYYY-MM-DD)。
+3. `txTime`: 交易时间 (格式 HH:mm:ss)。如果没有则返回 '00:00:00'。
+4. `txAmount`: 交易金额 (绝对值数字,**必须去除逗号**,转为 REAL/FLOAT)。
+5. `txDirection`: 交易方向 (必须经过逻辑判断输出 '收入' 或 '支出')。
+6. `txBalance`: 余额 (去除逗号)。
+7. `txSummary`: 摘要/用途。
+8. `txCounterparty`: 对方账号/户名。
+
+# Logic Rules (Crucial!)
+1. **Direction Logic**:
+   - 如果有单独的借/贷列:通常 "借"=`支出`, "贷"=`收入`。
+   - 如果有单独的收入/支出列:哪一列有值就是哪个方向。
+   - 如果金额有正负号:负号通常是支出。
+   - 请使用 SQL 的 `CASE WHEN ... THEN ... ELSE ... END` 语法处理。
+2. **Data Cleaning**:
+   - 金额字段必须处理千分位逗号:`CAST(REPLACE(c?, ',', '') AS REAL)`
+   - 日期必须清洗。
+
+# Output JSON Format
+```json
+{{
+    "sql": "SELECT ... FROM temp_raw_data WHERE ..."
+}}
+        """
+        user_template = """
+
+        # Table Info
+        Max Columns: {max_cols} Generic Column Names: c0, c1, ... c{max_cols_minus_1}
+
+        # Data Preview (Header + First 3 Rows)
+        {data_preview}
+
+        # Instruction
+        请编写 SQL 语句来提取并清洗数据。 注意:不要包含 Markdown 的 sql 标签,直接返回 JSON。 忽略表头行(通常 row_id = 1 是表头,所以 WHERE row_id > 1)。 """
+        return ChatPromptTemplate.from_messages([("system", system_template), ("user", user_template)])
+
+    async def _generate_transform_sql(self, header_row: str, sample_rows: list, max_cols: int) -> str:
+        """让 LLM 编写 SQL"""
+        # 构建预览数据,带上 c0, c1 这种列名提示,方便 LLM 对应
+        preview_text = ""
+
+        # 表头预览
+        header_parts = [p.strip() for p in header_row.strip().strip('|').split('|')]
+        header_map = " | ".join([f"c{i}({val})" for i, val in enumerate(header_parts)])
+        preview_text += f"Mapping Hint: {header_map}\n"
+        preview_text += "-" * 50 + "\n"
+
+        # 数据预览
+        for row in sample_rows:
+            preview_text += row + "\n"
+
+        prompt_params = {
+            "max_cols": max_cols,
+            "max_cols_minus_1": max_cols - 1,
+            "data_preview": preview_text
+        }
+
+        chain = self._get_sql_generation_prompt() | self.llm | self.parser
+
+        print(f"🧠 [LLM] 正在生成 SQL 清洗逻辑...")
+        try:
+            result = await chain.ainvoke(prompt_params)
+            sql = result.get("sql")
+            print(f"💡 [LLM] 生成 SQL:\n{sql}")
+            return sql
+        except Exception as e:
+            print(f"❌ SQL 生成失败: {e}")
+            return ""
 
     async def parse_to_csv(self, file_path: str) -> str:
-        # 1. 获取完整 Markdown 文本并按行切分
+        # 1. 获取 Markdown
         md_text = await self._invoke_miner_u(file_path)
-        if not md_text:
-            return ""
+        if not md_text: return ""
+
         # 记录开始时间(使用time.perf_counter获取高精度时间)
-        switch_start_time = time.perf_counter()
+        start_time = time.perf_counter()
         print("\n" + "=" * 40)
         print("📌 【步骤2 - 标准化转换】 开始执行")
-        # 初步切分
+
+        # 2. 预处理数据行
         raw_lines = md_text.splitlines()
-        # 提取真正的第一行作为基准表头
-        clean_lines = [l.strip() for l in raw_lines if l.strip()]
-        if len(clean_lines) < 2: return ""
+        clean_lines = [l.strip() for l in raw_lines if l.strip() and "|" in l]
+        # 简单判定表头 (包含2个以上关键词)
+        header_line = ""
+        header_idx = 0
+        keywords = ["日期", "金额", "余额", "摘要", "用途", "借", "贷"]
+        for idx, line in enumerate(clean_lines):
+            if sum(1 for k in keywords if k in line) >= 2:
+                header_line = line
+                header_idx = idx
+                break
 
-        # --- 【核心改进:动态寻找表头】 ---
-        table_header = ""
-        header_index = 0
+        if not header_line:
+            header_line = clean_lines[0]
 
-        header_keywords = ["余额", "金额", "账号", "日期", "借/贷", "摘要"]
+        # 数据行 (保留原始数据,之后灌入 DB)
+        data_rows = clean_lines  # 把表头也灌进去,通过 row_id > header_idx + 1 来过滤
 
-        for idx, line in enumerate(clean_lines):
-            # 如果某一行包含 2 个以上关键词,且含有 Markdown 表格分隔符 '|'
-            hit_count = sum(1 for kw in header_keywords if kw in line)
-            if hit_count >= 2 and "|" in line:
-                table_header = line
-                header_index = idx
-                break
+        # 3. 灌入 SQLite
+        conn, max_cols = self._init_sqlite_db(data_rows,header_line)
+        if not conn:
+            return ""
 
-        if not table_header:
-            table_header = clean_lines[0]
-            header_index = 0
-        data_rows = []
-        for line in clean_lines[header_index + 1:]:
-            if all(c in '|- ' for c in line): continue
-            if line == table_header: continue
-            # 过滤掉一些 MinerU 可能在表格末尾产生的页码或无关文字
-            if "|" not in line: continue
-            data_rows.append(line)
-
-        csv_header = "txId,txDate,txTime,txAmount,txDirection,txBalance,txSummary,txCounterparty,createdAt\n"
-        csv_content = csv_header
-
-        batch_size = 15
-        global_tx_counter = 1
-
-        # 构建 LCEL Chain: Prompt -> LLM -> Parser
-        chain = self._get_csv_prompt_template() | self.llm | self.parser
-
-        # 2. 分块处理
-        for i in range(0, len(data_rows), batch_size):
-            chunk = data_rows[i: i + batch_size]
-            context_chunk = [table_header] + chunk
-            chunk_str = "\n".join(context_chunk)
-            # 1. 记录开始时间(使用time.perf_counter获取高精度时间)
-            start_time = time.perf_counter()
-            print(f"🔄 正在通过LLM转换批次 {i // batch_size + 1},包含 {len(chunk)} 条数据...")
-            # print(f"待转换的数据块:\n{chunk_str}")
-            try:
-                # --- LangChain 调用 ---
-                # 使用 ainvoke 异步调用链
-                # 记录API调用开始时间
-                call_start_time = datetime.datetime.now()
-
-                data_data = await chain.ainvoke({
-                    "start_id": global_tx_counter,
-                    "chunk_data": chunk_str
-                })
-
-                # 记录API调用结束时间
-                call_end_time = datetime.datetime.now()
-
-                # 记录API调用结果 - 简化版:只保存提示词和结果数据
-                call_id = f"api_llm_数据转换_{'{:.2f}'.format((call_end_time - call_start_time).total_seconds())}"
-
-                # 从chain中提取提示词(如果可能)
-                prompt_content = ""
-                try:
-                    # 尝试从chain获取最后的消息内容
-                    if hasattr(chain, 'get_prompts'):
-                        prompts = chain.get_prompts()
-                        if prompts:
-                            prompt_content = str(prompts[-1])
-                    else:
-                        # 如果无法获取,构造基本的提示词信息
-                        prompt_content = f"转换批次数据,start_id: {global_tx_counter}, chunk_data: {chunk_str[:200]}..."
-                except:
-                    prompt_content = f"转换批次数据,start_id: {global_tx_counter}, chunk_data: {chunk_str[:200]}..."
-
-                api_call_info = {
-                    "call_id": call_id,
-                    "start_time": call_start_time.isoformat(),
-                    "end_time": call_end_time.isoformat(),
-                    "duration": (call_end_time - call_start_time).total_seconds(),
-                    "prompt": prompt_content,
-                    "input_params": {
-                        "start_id": global_tx_counter,
-                        "chunk_data": chunk_str
-                    },
-                    "llm_result": data_data
-                }
-                self.api_calls.append(api_call_info)
-
-                # 保存API结果到文件 (Markdown格式,更易阅读)
-                # 使用运行ID创建独立的文件夹
-                run_id = os.environ.get('FLOW_RUN_ID', 'default')
-                api_results_dir = f"api_results_{run_id}"
-                os.makedirs(api_results_dir, exist_ok=True)
-                timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
-                filename = f"{timestamp}_{call_id}.md"
-                filepath = os.path.join(api_results_dir, filename)
+        try:
+            # 4. LLM 生成 SQL
+            # 取表头和前3条数据作为样本
+            sample_data = clean_lines[header_idx:header_idx + 4]
+            sql_query = await self._generate_transform_sql(header_line, sample_data, max_cols)
+
+            if not sql_query:
+                return ""
+
+            # 5. 执行 SQL
+            cursor = conn.cursor()
+
+            # 为了安全,确保 SQL 只是 SELECT
+            if "DROP" in sql_query.upper() or "DELETE" in sql_query.upper():
+                raise ValueError("Unsafe SQL detected")
+
+            # 有时候 LLM 忘记过滤表头,我们强制在 SQL 外层或提示中处理
+            # 这里的简单做法是假设 SQL 正确,或者在 SQL 后追加 limit 测试
 
+            print(f"🚀 [SQLite] 执行查询...")
+            cursor.execute(sql_query)
+            results = cursor.fetchall()
+
+            print(f"✅ 提取成功,共 {len(results)} 条数据")
+
+            # 6. 导出为 CSV 字符串
+            output = io.StringIO()
+            writer = csv.writer(output, quoting=csv.QUOTE_MINIMAL, lineterminator='\n')
+
+            # 写入标准表头
+            csv_header = ["txId", "txDate", "txTime", "txAmount", "txDirection", "txBalance", "txSummary",
+                          "txCounterparty", "createdAt"]
+            writer.writerow(csv_header)
+
+            created_at = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+
+            for row in results:
+                # row 是元组 (id, date, time, amt, dir, bal, sum, counter)
+                # 转换 tuple 为 list 并添加 createdAt
+                row_list = list(row)
+
+                # --- 🆕 新增:txAmount 取绝对值逻辑 ---
                 try:
-                    with open(filepath, 'w', encoding='utf-8') as f:
-                        f.write("# 数据转换结果\n\n")
-                        f.write("## 调用信息\n\n")
-                        f.write(f"- 调用ID: {call_id}\n")
-                        f.write(f"- 开始时间: {call_start_time.isoformat()}\n")
-                        f.write(f"- 结束时间: {call_end_time.isoformat()}\n")
-                        f.write(f"- 执行时长: {(call_end_time - call_start_time).total_seconds():.2f} 秒\n")
-                        f.write("\n## 提示词入参\n\n")
-                        f.write("```\n")
-                        f.write(api_call_info["prompt"])
-                        f.write("\n```\n\n")
-                        f.write("## 输入参数\n\n")
-                        f.write("```json\n")
-                        f.write(json.dumps(api_call_info["input_params"], ensure_ascii=False, indent=2))
-                        f.write("\n```\n\n")
-                        f.write("## LLM返回结果\n\n")
-                        f.write("```json\n")
-                        f.write(json.dumps(api_call_info["llm_result"], ensure_ascii=False, indent=2))
-                        f.write("\n```\n")
-                    print(f"[API_RESULT] 保存API结果文件: {filepath}")
-                except Exception as e:
-                    print(f"[ERROR] 保存API结果文件失败: {filepath}, 错误: {str(e)}")
-
-                # print(f"💡 LLM 返回数据: {data_data}")
-
-                # 兼容处理:LangChain Parser 通常会直接返回 List 或 Dict
-                if isinstance(data_data, dict):
-                    # 尝试寻找 transactions 键,如果没有则假设整个 dict 就是我们要的对象(虽然罕见)
-                    batch_data = data_data.get("transactions", [data_data])
-                    # 如果取出来还是 dict (例如单条记录),包一层 list
-                    if isinstance(batch_data, dict):
-                        batch_data = [batch_data]
-                elif isinstance(data_data, list):
-                    batch_data = data_data
-                else:
-                    batch_data = []
-
-                if batch_data:
-                    final_list = self._validate_and_reconcile(batch_data)
-                    final_list = sorted(final_list, key=lambda x: (x['txId']))
-                    output = io.StringIO()
-                    createdAtStr = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
-                    writer = csv.writer(output, quoting=csv.QUOTE_MINIMAL, lineterminator='\n')
-
-                    print(f"✅ 批次转换成功,包含 {len(final_list)} 条记录。")
-
-                    for item in final_list:
-                        writer.writerow([
-                            item.get("txId", ""),
-                            item.get("txDate", ""),
-                            item.get("txTime", ""),
-                            item.get("txAmount", ""),
-                            item.get("txDirection", ""),
-                            item.get("txBalance", ""),
-                            item.get("txSummary", ""),
-                            item.get("txCounterparty", ""),
-                            createdAtStr
-                        ])
-
-                    batch_csv_string = output.getvalue()
-                    csv_content += batch_csv_string
-
-                    global_tx_counter += len(final_list)
-
-            except Exception as e:
-                print(f"⚠️ 批次执行失败: {e}")
-            finally:
-                end_time = time.perf_counter()
-                elapsed_time = end_time - start_time
-                print(f"⏱️  执行耗时: {elapsed_time:.2f} 秒")
-        print(f"📊 转换结果:共转换 {global_tx_counter - 1} 条数据")
-        print(f"✅ 【步骤2 - 标准化转换】 执行完成")
-        print(f"⏱️  执行总耗时:{time.perf_counter() - switch_start_time:.2f} 秒")
-        return csv_content
+                    raw_amount = str(row_list[3]).replace(',', '')  # 再次确保去除逗号
+                    if raw_amount:
+                        # 转换为浮点数取绝对值,再转回字符串(或保持 float)
+                        row_list[3] = abs(float(raw_amount))
+                except (ValueError, TypeError):
+                    # 如果转换失败(例如识别到了文字),保持原样或设为 0.0
+                    print(f"⚠️ 金额转换失败: {row_list[3]}")
+                    row_list[3] = 0.0
+
+                # 安全性清洗:处理可能的 None
+                row_list = [str(x) if x is not None else "" for x in row_list]
+
+                # 确保只取前8个字段 (以防 LLM 多选了)
+                final_row = row_list[:8] + [created_at]
+                writer.writerow(final_row)
+            return output.getvalue()
+
+        except sqlite3.Error as e:
+            print(f"❌ SQLite 执行错误: {e}")
+            # 可以在这里做一个重试机制:把错误信息返给 LLM 让它修正 SQL
+            return ""
+        finally:
+            conn.close()
+            print(f"✅ 【步骤2 - 标准化转换】 执行完成")
+            print(f"⏱️ 总耗时: {time.perf_counter() - start_time:.2f} 秒")
 
+    # --- 流程入口 ---
     async def parse_and_save_to_file(self, file_path: str, output_dir: str = "output") -> str:
-        """
-        供 Workflow 调用:解析并保存文件,返回全路径名
-        """
         current_script_path = os.path.abspath(__file__)
         current_dir = os.path.dirname(current_script_path)
         file_full_name = os.path.basename(file_path)
-        file_name = os.path.splitext(file_full_name)[0]  # 不带后缀 11111
+        file_name = os.path.splitext(file_full_name)[0]
         output_dir = os.path.normpath(os.path.join(current_dir, "..", "..", output_dir))
 
         os.makedirs(output_dir, exist_ok=True)
@@ -458,17 +355,13 @@ JSON Array:
             raise Exception("数据解析失败,未生成有效内容")
 
     async def run_workflow_task(self, input_file_path: str) -> dict:
-        """
-        标准 Workflow 入口方法
-        """
         # 1. 记录开始时间(使用time.perf_counter获取高精度时间)
         start_time = time.perf_counter()
         print(f"BEGIN---数据标准化任务开始---")
         try:
             print(f"待执行标准化的文件:{input_file_path}")
-            api_results_dir = "data_files"
-            saved_path = await self.parse_and_save_to_file(input_file_path, api_results_dir)
-
+            saved_path = await self.parse_and_save_to_file(input_file_path, "data_files")
+            print(f"结果文件保存至:{saved_path}")
             return {
                 "status": "success",
                 "file_path": saved_path,
@@ -476,10 +369,7 @@ JSON Array:
                 "timestamp": datetime.datetime.now().isoformat()
             }
         except Exception as e:
-            return {
-                "status": "error",
-                "message": str(e)
-            }
+            return {"status": "error", "message": str(e)}
         finally:
             end_time = time.perf_counter()
             elapsed_time = end_time - start_time
@@ -487,14 +377,13 @@ JSON Array:
             print(f"END---数据标准化任务结束")
 
 
-
 async def data_standize(api_key: str, base_url: str, multimodal_api_url: str, input_file_path: str) -> dict:
     """
     数据标准化入口方法
     """
     # 创建Agent
     agent = TransactionParserAgent(
-        api_key="sk-8634dbc2866540c4b6003bb5733f23d8",
+        api_key=api_key,
         base_url=base_url,
         multimodal_api_url=multimodal_api_url
     )
@@ -511,7 +400,7 @@ async def main():
     current_script_path = os.path.abspath(__file__)
     current_dir = os.path.dirname(current_script_path)
     # 模拟 Workflow 传入一个待处理文件
-    input_pdf = "data_files/4.pdf"
+    input_pdf = "data_files/11111.png"
     filepath = os.path.normpath(os.path.join(current_dir, "..", "..", input_pdf))
 
     if not os.path.exists(filepath):