Pārlūkot izejas kodu

智能体代码完成,提交

wangyang 1 nedēļu atpakaļ
vecāks
revīzija
f5cd9b03e6

+ 2 - 0
.idea/modules.xml

@@ -2,6 +2,8 @@
 <project version="4">
   <component name="ProjectModuleManager">
     <modules>
+      <module fileurl="file://$PROJECT_DIR$/../aireport/.idea/aireport.iml" filepath="$PROJECT_DIR$/../aireport/.idea/aireport.iml" />
+      <module fileurl="file://$USER_HOME$/Downloads/big_agent/.idea/big_agent.iml" filepath="$USER_HOME$/Downloads/big_agent/.idea/big_agent.iml" />
       <module fileurl="file:///Applications/work/宇信科技/智能数据平台/llmops/.idea/llmops.iml" filepath="/Applications/work/宇信科技/智能数据平台/llmops/.idea/llmops.iml" />
       <module fileurl="file://$PROJECT_DIR$/.idea/tx_flow_analysis.iml" filepath="$PROJECT_DIR$/.idea/tx_flow_analysis.iml" />
     </modules>

+ 2 - 0
.idea/tx_flow_analysis.iml

@@ -5,5 +5,7 @@
     <orderEntry type="jdk" jdkName="Python 3.1 (2)" jdkType="Python SDK" />
     <orderEntry type="sourceFolder" forTests="false" />
     <orderEntry type="module" module-name="llmops" />
+    <orderEntry type="module" module-name="aireport" />
+    <orderEntry type="module" module-name="big_agent" />
   </component>
 </module>

+ 2 - 1
README

@@ -8,4 +8,5 @@
 pip install langchain
 pip install langchain_openai
 pip install fastapi
-pip install uvicorn
+pip install uvicorn
+pip install pdfplumber   (pdf交易流水单解析需要)

+ 596 - 0
llmops/agents/chat_bot.py

@@ -0,0 +1,596 @@
+
+from typing import TypedDict, Any, List, Dict
+
+from langgraph.graph import StateGraph, START, END
+from langchain_core.messages import AIMessage, HumanMessage, BaseMessage
+import pandas as pd
+import json
+
+from langgraph.prebuilt import create_react_agent
+from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
+from llmops.agents.datadev.memory.memory_saver_with_expiry2 import MemorySaverWithExpiry
+
+from llmops.agents.datadev.llm import get_llm
+from llmops.agents.tools.unerstand_dataset_tool import understand_dataset_structure
+
+import logging
+
+logger = logging.getLogger(__name__)
+logging.basicConfig(level=logging.DEBUG,
+                    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+                    datefmt='%Y-%m-%d %H:%M:%S')
+
+class AgentState(TypedDict):
+    """
+    ChatBot智能体状态
+    """
+    question: str         # 用户问题
+    data_set: List[Any]   # 数据集 [{}]
+    transactions_df: pd.DataFrame      # pd 数据集
+    intent_num: str  # 意图分类ID(1-8)
+    history_messages: List[BaseMessage] # 历史消息
+    bank_type: str   # 银行类型
+    transactions_df: pd.DataFrame      # 数据集
+    analysis_results: Dict[str, Any]   # 分析结果
+    current_step: str                  # 当前操作步骤
+    file_path: str                     # 数据集文件路径
+    file_hash: str                     # 数据集文件hash值
+    status: str     # 状态 success | error
+    answer: Any     # AI回答
+
+class TxFlowAnalysisAgent:
+    """交易流水分析智能体"""
+
+    def __init__(self):
+        # self.llm_orchestrator = LLMOrchestrator()
+        self.llm = get_llm()
+
+        # 会话记忆
+        self.memory = MemorySaverWithExpiry(expire_time=600, clean_interval=60)
+
+        # 构建图
+        self.graph = self._build_graph()
+
+
+    def _build_graph(self):
+        # 构造计算图
+        graph_builder = StateGraph(AgentState)
+
+        # 添加节点
+        graph_builder.add_node("_intent_recognition", self._intent_recognition)
+        graph_builder.add_node("_say_hello", self._say_hello)
+        graph_builder.add_node("_intent_clarify", self._intent_clarify)
+        graph_builder.add_node("_beyond_ability", self._beyond_ability)
+        graph_builder.add_node("_data_op", self._data_op)
+        graph_builder.add_node("_gen_report", self._gen_report)
+        graph_builder.add_node("_analysis", self._analysis)
+        graph_builder.add_node("_statistics", self._statistics)
+        graph_builder.add_node("_query", self._query)
+        graph_builder.add_node("_invalid", self._invalid)
+
+        # 添加边
+        graph_builder.add_edge(START, "_intent_recognition")
+        graph_builder.add_edge("_say_hello", END)
+        graph_builder.add_edge("_intent_clarify", END)
+        graph_builder.add_edge("_beyond_ability", END)
+        graph_builder.add_edge("_data_op", END)
+        graph_builder.add_edge("_gen_report", END)
+        graph_builder.add_edge("_analysis", END)
+        graph_builder.add_edge("_statistics", END)
+        graph_builder.add_edge("_query", END)
+        graph_builder.add_edge("_invalid", END)
+
+        # 条件边
+        graph_builder.add_conditional_edges(source="_intent_recognition", path=self._router, path_map={
+            "_say_hello": "_say_hello",
+            "_intent_clarify": "_intent_clarify",
+            "_beyond_ability": "_beyond_ability",
+            "_data_op": "_data_op",
+            "_gen_report": "_gen_report",
+            "_analysis": "_analysis",
+            "_statistics": "_statistics",
+            "_query": "_query",
+            "_invalid": "_invalid"
+        })
+
+        return graph_builder.compile(checkpointer=self.memory)
+
+
+    def _intent_recognition(self, state: AgentState):
+        """
+        根据用户问题,识别用户意图
+        :param state:
+        :return:
+        """
+        template = """
+            你是一位银行流水分析助手,专门识别用户问题的意图类别。
+
+            ### 任务说明
+            根据用户问题和历史对话,判断用户意图属于以下哪个类别。直接输出对应的数字编号。
+    
+            ### 历史对话:
+            {history_messages}
+
+            ### 当前用户问题:
+            {question}
+
+            ### 意图分类体系:
+            1. **流水查询与检索** - 查找特定交易记录
+               - 示例:"查一下昨天给张三的转账"、"找一笔5000元的交易"
+            2. **统计与汇总** - 计算数值、统计信息
+               - 示例:"本月总支出多少"、"工资收入总计多少"
+            3. **洞察与分析** - 分析模式、趋势、异常
+               - 示例:"我的消费趋势如何"、"有没有异常交易"
+            4. **生成报告**  - 生成一份分析报告
+               - 示例:"生成一份收入与支持的分析报告"
+            5. **数据操作与证明** - 验证交易
+               - 示例:"导出本月流水"、"查这笔交易的流水号"
+            6. **超出能力边界** - 系统无法处理的问题
+               - 示例:"帮我转账"、"下个月我能存多少钱"
+            7. **意图澄清** - 问题模糊需要进一步确认
+               - 示例:"花了多少钱"(无时间范围)、"查一下交易"(无具体条件)
+            8. ** 打招呼 **
+               - 示例: "你好"
+    
+            ### 输出规则:
+            - 只输出1个数字(1-8),不要任何解释
+            - 如果问题不明确,优先选7(意图澄清)
+            - 如果问题超出流水分析范围,选6(超出能力边界)
+            - 如果有多个意图,选最主要的一个
+        """
+        pt = ChatPromptTemplate.from_template(template)
+        chain = pt | self.llm
+
+        # 从state获取历史消息
+        history_messages = state.get("history_messages", [])
+        # 转换为字符串格式
+        history_str = "\n".join([f"{msg.type}: {msg.content}" for msg in history_messages])
+
+        try:
+
+            response = chain.invoke({"question": state["question"], "history_messages": history_str})
+            logger.info(f"对用户问题:{state['question']} 进行意图识别:{response.content}")
+
+            # 更新历史
+            new_question = HumanMessage(content=state["question"])
+            ai_message = AIMessage(content=response.content)
+            new_history = history_messages + [new_question] + [ai_message]
+
+            return {"intent_num": response.content, "history_messages": new_history[-100:]}
+        except Exception as e:
+            print(f"用户问题:{state['question']}意图识别异常,{str(e)}")
+            return {"status": "error"}
+
+    def _router(self, state: AgentState):
+        """
+        根据意图进行进行路由
+        :param state:
+        :return:
+        """
+        intent_num = state["intent_num"]
+        if intent_num == "8":
+            return "_say_hello"
+        elif intent_num == "7": # 意图澄清
+            return "_intent_clarify"
+        elif intent_num == "6": # 超出能力边界
+            return "_beyond_ability"
+        elif intent_num == "5": # 数据操作与证明
+            return "_data_op"
+        elif intent_num == "4": # 生成报告
+            return "_gen_report"
+        elif intent_num == "3": # 洞察分析
+            return "_analysis"
+        elif intent_num == "2": # 统计汇总
+            return "_statistics"
+        elif intent_num == "1": # 流水明细查询
+            return "_query"
+        else:
+            return "_invalid"
+
+    def _say_hello(self, state: AgentState):
+        """
+        向客户打招呼
+        :param state:
+        :return:
+        """
+        template = """
+            你是交易流水分析助手,用户在跟你打招呼,请合理组织话术并进行回答。
+            
+            ### 用户说
+            {question}
+            
+            ### 回答要求
+            - 友好
+            - 专业
+            - 快速
+        """
+        pt = ChatPromptTemplate.from_template(template)
+        chain = pt | self.llm
+        try:
+            response = chain.invoke({"question": state["question"]})
+            logger.info(f"用户说:{state['question']}, AI回答: {response.content}")
+            return {"answer": response.content}
+        except Exception as e:
+            print(f"say_hello异常 {str(e)}")
+            return {"status": "error", "message": str(e)}
+
+
+    def _intent_clarify(self, state: AgentState):
+        """
+        意图澄清节点
+        :param state:
+        :return:
+        """
+        template = """
+            你是交易流水分析助手,根据用户问题和历史对话,对用户的问题进行反问,以便充分理解用户意图。
+            
+            ### 用户问题
+            {question}
+            
+            ### 历史对话
+            {history_messages}
+        """
+        pt = ChatPromptTemplate.from_template(template)
+        # 从state获取历史消息
+        history_messages = state.get("history_messages", [])
+        # 转换为字符串格式
+        history_str = "\n".join([f"{msg.type}: {msg.content}" for msg in history_messages])
+
+        chain = pt | self.llm
+        try:
+            response = chain.invoke({
+                "question": state["question"],
+                "history_messages": history_str
+            })
+
+            # 更新历史对话
+            new_question = HumanMessage(content=state["question"])
+            ai_message = AIMessage(content=response.content)
+            new_history = history_messages + [new_question] + [ai_message]
+
+            return {"answer": response.content,  "history_messages": new_history[-100:]}
+        except Exception as e:
+            return {"status": "error", "message": str(e)}
+
+    def _beyond_ability(self, state: AgentState):
+        """
+        超出能力边界处理节点
+        :param state:
+        :return:
+        """
+        template = """
+            你是交易流水分析助手,对于用户的问题,已超出你的能力边界,组合合理的话术回答用户。
+            
+            ### 用户问题
+            {question}
+            
+            ### 要求
+            - 友好
+            - 专业
+            - 快速
+        """
+        pt = ChatPromptTemplate.from_template(template)
+        chain = pt | self.llm
+        try:
+            response = chain.invoke({"question": state["question"]})
+            return {"answer": response.content}
+        except Exception as e:
+            return {"status": "error", "message": str(e)}
+
+    def _data_op(self, state: AgentState):
+        """
+        数据验证处理节点
+        :param state:
+        :return:
+        """
+        return {"answer": "暂不支持"}
+
+    def _gen_report(self, state: AgentState):
+        """
+        生成报告节点
+        :param state:
+        :return:
+        """
+        template = """
+        
+        """
+        return {"answer": {"report": {"title": "this is title", "content": "this is content"}}}
+
+    def _analysis(self, state: AgentState):
+        """
+        洞察分析处理节点
+        :param state:
+        :return:
+        """
+        return {"answer": "这是洞察分析结果"}
+
+    def _statistics(self, state: AgentState):
+        """
+        统计汇总处理节点
+        :param state:
+        :return:
+        """
+        return {"answer": "这是统计汇总处理结果"}
+
+    def _query(self, state: AgentState):
+        """
+        流水检索处理节点
+        :param state:
+        :return:
+        """
+
+        # 创建 react agent( reasoning/acting )
+        # 工具集,理解数据集结构
+        tools = [understand_dataset_structure]
+
+        system_prompt = f"""
+        你是交易流水分析助手,根据用户问题和已有的数据集,检索出对应的数据并返回。
+
+        ### 用户问题
+        {state['question']}
+
+        ### 已有数据集
+        {state['data_set']}
+
+        ### 要求
+        - 首先要理解 数据集结构
+        - 必须在 已有数据集(已提供) 中进行检索
+        - 检索出 全部 符合要求的记录
+        - 如果没有符合要求的记录,则友好提示
+        """
+        try:
+            agent = create_react_agent(model=self.llm, tools=tools, prompt=system_prompt)
+            response = agent.invoke({
+                "messages": [
+                    {
+                        "role": "user",
+                        "content": f"""
+                            ### 用户问题
+                            {state['question']}
+                        """
+                    }
+                ]
+            })
+            answer = response["messages"][-1].content
+
+            # 解析 LLM 的回答
+            try:
+                result = json.loads(answer)
+                if not result:
+                    answer = "没有找到符合要求的记录"
+            except json.JSONDecodeError:
+                answer = "LLM 的回答格式不正确,请重新提问"
+
+            logger.info(f"用户问题:{state['question']}, AI回答: {answer}")
+            return {"answer": answer}
+        except Exception as e:
+            logger.error(f"_query 异常:{str(e)}")
+            return {"status": "error", "message": str(e)}
+
+
+    def _invalid(self, state: AgentState):
+        """
+        无效问题
+        :param state:
+        :return:
+        """
+        return {"answer": "这是一个无效问题"}
+
+    def _knowledge_save(self, state: AgentState):
+        """
+        从用户问题,AI解答等过程中发现知识、存储知识
+        :param state:
+        :return:
+        """
+
+    async def process_upload(self, state: AgentState) -> AgentState:
+        """处理文件上传"""
+        print("🔍 处理文件上传...")
+        import os
+        file_path = state.get("file_path", "")
+        if not file_path or not os.path.exists(file_path):
+            state["messages"].append(AIMessage(content="未找到上传的流水文件,请重新上传。"))
+            return state
+
+        try:
+            # 提取文本
+            pdf_text = self.pdf_tools.extract_text_from_pdf(file_path)
+
+            # 检测银行类型
+            bank_type = self.pdf_tools.detect_bank_type(pdf_text)
+            state["bank_type"] = bank_type
+
+            # 解析交易数据
+            if "招商银行" in bank_type:
+                transactions_json = self.pdf_tools.parse_cmb_statement(pdf_text)
+                transactions = json.loads(transactions_json)
+
+                # 转换为DataFrame
+                df = pd.DataFrame(transactions)
+                df['date'] = pd.to_datetime(df['date'])
+                df = df.sort_values('date')
+
+                # 添加分类
+                df['category'] = df['description'].apply(
+                    lambda x: self._categorize_transaction(x)
+                )
+
+                state["transactions_df"] = df
+
+                # 创建分析工具实例
+                self.analysis_tools = AnalysisTools(df)
+
+                summary = self.analysis_tools.get_summary_statistics()
+
+                response = f"""
+                    ✅ 文件解析成功!
+
+                    📊 基本信息:
+                    - 银行类型:{bank_type}
+                    - 交易笔数:{summary['total_transactions']} 笔
+                    - 时间范围:{summary['date_range']['start']} 至 {summary['date_range']['end']}
+                    - 总收入:¥{summary['total_income']:,.2f}
+                    - 总支出:¥{summary['total_expense']:,.2f}
+                    - 净现金流:¥{summary['net_cash_flow']:,.2f}
+
+                    您可以问我:
+                    1. "分析我的消费模式"
+                    2. "检测异常交易"
+                    3. "生成财务报告"
+                    4. "查询大额交易"
+                    5. "预测未来余额"
+                """
+
+                state["messages"].append(AIMessage(content=response))
+
+            else:
+                state["messages"].append(AIMessage(
+                    content=f"检测到{bank_type},当前主要支持招商银行格式,其他银行功能正在开发中。"
+                ))
+
+            state["current_step"] = "analysis_ready"
+
+        except Exception as e:
+            state["messages"].append(AIMessage(
+                content=f"文件解析失败:{str(e)}"
+            ))
+
+        return state
+
+    def ask_question(self, session: str, question: str, data_set_file: str = None) -> dict:
+        """
+        程序调用入口,用户提问
+        :param session:   会话ID
+        :param question:  用户问题
+        :param data_set_file: 数据集文档(json数据文件)
+        :return:
+        """
+
+        data_set = []
+        # 读取 json 文件数据, 格式为 List[Dict]
+        import json
+        import os
+        try:
+            logger.info(f"传入的数据文件路径: {data_set_file}")
+            if data_set_file:
+                if os.path.exists(data_set_file):
+                    with open(data_set_file, 'r', encoding='utf-8') as file:
+                        data_set = json.load(file)
+                        logger.info(f"加载数据条数:{len(data_set)}")
+                        # 加载数据集
+                        df = pd.DataFrame(data_set)
+                        df['txDate'] = pd.to_datetime(df['txDate'])  # 提前转换类型
+                else:
+                    logger.error(f"数据文件:{data_set_file} 不存在,请检查路径!")
+            else:
+                logger.info("未传入数据集文件")
+        except Exception as e:
+            logger.error(f"读取数据文件:{data_set_file}异常,{str(e)}")
+            return {
+                "status": "error",
+                "message": f"读取数据文件:{data_set_file}异常,{str(e)}"
+            }
+
+        if not session or not question:
+            return {
+                "status": "error",
+                "message": "缺少参数"
+            }
+        result = {
+            "status": "success"
+        }
+        try:
+            config = {"configurable": {"thread_id": session}}
+            current_state = self.memory.get(config)
+            history_messages = current_state["channel_values"]["history_messages"] if current_state else []
+            if len(data_set) == 0: # 没有指定数据集
+                data_set = current_state["channel_values"]["data_set"] if current_state else []
+
+            # 执行
+            response = self.graph.invoke({
+                "session": session,
+                "question": question,
+                "data_set": data_set,
+                "transactions_df": df,
+                "history_messages": history_messages,
+            }, config)
+            result["answer"] = response.get("answer","")
+        except Exception as e:
+            print(f"用户对话失败,异常:{str(e)}")
+            result["status"] = "error"
+
+        return result
+
+
+# ==================== 构建智能体图 ====================
+def create_bank_statement_agent():
+    """创建银行流水分析智能体图"""
+
+    # 创建节点
+    nodes = TxFlowAnalysisAgent()
+
+    # 创建状态图
+    workflow = StateGraph(AgentState)
+
+    # 添加节点
+    workflow.add_node("process_upload", nodes.process_upload)
+    workflow.add_node("analyze_query", nodes.analyze_query)
+    workflow.add_node("waiting_for_input", lambda state: state)  # 等待输入
+
+    # 设置入口点
+    workflow.set_entry_point("waiting_for_input")
+
+    # 添加条件边
+    workflow.add_conditional_edges(
+        "waiting_for_input",
+        nodes.route_query,
+        {
+            "process_upload": "process_upload",
+            "analyze_query": "analyze_query",
+            "waiting_for_input": "waiting_for_input"
+        }
+    )
+
+    # 添加普通边
+    workflow.add_edge("process_upload", "waiting_for_input")
+    workflow.add_edge("analyze_query", "waiting_for_input")
+
+    # 编译图
+    graph = workflow.compile()
+
+    return graph
+
+if __name__ == '__main__':
+    agent = TxFlowAnalysisAgent()
+    question = "你好"
+    # 数据集文件
+    data_file = "/Applications/work/宇信科技/知识沉淀平台/原始数据-流水分析-农业原始数据.json"
+    result = agent.ask_question(session="s1", question=question, data_set_file=data_file)
+    print(f"问题:{question}, 响应:{result}")
+
+    question = "查询交易日期是2023-01-05对应的收入记录"
+    result = agent.ask_question(session="s1", question=question, data_set_file=data_file)
+    print(f"问题:{question}, 响应:{result}")
+
+    question = "查询交易对手是绿源农产品公司的记录"
+    result = agent.ask_question(session="s1", question=question, data_set_file=data_file)
+    print(f"问题:{question}, 响应:{result}")
+
+    # question = "查找转给贾强的交易记录"
+    # result = agent.ask_question(session="s1", question=question)
+    # print(f"问题:{question}, 意图:{result}")
+    # question = "花了多少钱"
+    # result = agent.ask_question(session="s1", question=question)
+    # print(f"问题:{question}, 意图:{result}")
+    # question = "统计用户总收入"
+    # result = agent.ask_question(session="s1", question=question)
+    # print(f"问题:{question}, 意图:{result}")
+    # question = "生成分析报告"
+    # result = agent.ask_question(session="s1", question=question)
+    # print(f"问题:{question}, 意图:{result}")
+    # question = "转账给张三"
+    # result = agent.ask_question(session="s1", question=question)
+    # print(f"问题:{question}, 意图:{result}")
+    # question = "生成一份异常分析报告"
+    # result = agent.ask_question(session="s1", question=question)
+    # print(f"问题:{question}, 意图:{result}")

+ 133 - 0
llmops/agents/data_manager.py

@@ -0,0 +1,133 @@
+import pandas as pd
+import json
+from typing import List, Dict, Any, Tuple
+import os
+
+
+class DataManager:
+    """统一管理数据加载、验证和预处理"""
+
+    # 字段映射(支持多种别名)
+    FIELD_MAPPING = {
+        'txId': ['txId', 'transaction_id', '交易ID', 'id'],
+        'txDate': ['txDate', 'transaction_date', '交易日期', 'date'],
+        'txTime': ['txTime', 'transaction_time', '交易时间', 'time'],
+        'txAmount': ['txAmount', 'amount', '交易金额', '金额'],
+        'txBalance': ['txBalance', 'balance', '余额', '交易后余额'],
+        'txDirection': ['txDirection', 'direction', '交易方向', '收支'],
+        'txSummary': ['txSummary', 'summary', '交易摘要', '摘要', '说明'],
+        'txCounterparty': ['txCounterparty', 'counterparty', '交易对手', '对方账户'],
+        'createdAt': ['createdAt', 'created_at', '创建时间']
+    }
+
+    # 必需字段
+    REQUIRED_FIELDS = ['txId', 'txDate', 'txAmount', 'txDirection']
+
+    @staticmethod
+    def load_from_file(file_path: str) -> Tuple[List[Dict[str, Any]], pd.DataFrame]:
+        """从JSON文件加载数据,支持字段别名"""
+
+        if not os.path.exists(file_path):
+            raise FileNotFoundError(f"数据文件不存在: {file_path}")
+
+        with open(file_path, 'r', encoding='utf-8') as f:
+            raw_data = json.load(f)
+
+        if not isinstance(raw_data, list):
+            raise ValueError("JSON文件内容必须是数组格式")
+
+        if not raw_data:
+            raise ValueError("数据文件为空")
+
+        # 标准化字段名
+        standardized_data = []
+        for record in raw_data:
+            standardized_record = {}
+            for std_field, possible_names in DataManager.FIELD_MAPPING.items():
+                for name in possible_names:
+                    if name in record:
+                        standardized_record[std_field] = record[name]
+                        break
+
+            # 保留原始数据中未映射的字段
+            for key, value in record.items():
+                if key not in [name for names in DataManager.FIELD_MAPPING.values() for name in names]:
+                    standardized_record[key] = value
+
+            standardized_data.append(standardized_record)
+
+        # 转换为DataFrame并优化
+        df = pd.DataFrame(standardized_data)
+        df = DataManager._optimize_dataframe(df)
+
+        return standardized_data, df
+
+    @staticmethod
+    def _optimize_dataframe(df: pd.DataFrame) -> pd.DataFrame:
+        """优化DataFrame数据类型"""
+
+        # 日期字段
+        if 'txDate' in df.columns:
+            df['txDate'] = pd.to_datetime(df['txDate'], errors='coerce').dt.date
+
+        # 时间字段
+        if 'txTime' in df.columns:
+            df['txTime'] = df['txTime'].astype(str)
+
+        # 金额字段
+        for col in ['txAmount', 'txBalance']:
+            if col in df.columns:
+                df[col] = pd.to_numeric(df[col], errors='coerce')
+
+        # 创建时间
+        if 'createdAt' in df.columns:
+            df['createdAt'] = pd.to_datetime(df['createdAt'], errors='coerce')
+
+        # 分类字段
+        if 'txDirection' in df.columns:
+            df['txDirection'] = df['txDirection'].astype('category')
+
+        return df
+
+    @staticmethod
+    def validate_data_schema(data: List[Dict[str, Any]]) -> Tuple[bool, List[str]]:
+        """验证数据格式"""
+        errors = []
+
+        if not data:
+            return False, ["数据集为空"]
+
+        # 检查必需字段
+        first_record = data[0]
+        missing_fields = []
+        for field in DataManager.REQUIRED_FIELDS:
+            if field not in first_record:
+                missing_fields.append(field)
+
+        if missing_fields:
+            errors.append(f"缺少必需字段: {', '.join(missing_fields)}")
+
+        return len(errors) == 0, errors
+
+    @staticmethod
+    def format_data_summary(data: List[Dict[str, Any]]) -> str:
+        """生成数据摘要"""
+        if not data:
+            return "数据集为空"
+
+        df = pd.DataFrame(data)
+        summary = []
+
+        summary.append(f"记录总数: {len(data)}")
+
+        if 'txDate' in df.columns:
+            summary.append(f"日期范围: {df['txDate'].min()} 至 {df['txDate'].max()}")
+
+        if 'txAmount' in df.columns:
+            summary.append(f"金额范围: {df['txAmount'].min()} 至 {df['txAmount'].max()}")
+
+        if 'txDirection' in df.columns:
+            direction_counts = df['txDirection'].value_counts().to_dict()
+            summary.append(f"收支分布: {direction_counts}")
+
+        return " | ".join(summary)

+ 159 - 0
llmops/agents/graph.py

@@ -0,0 +1,159 @@
+from langgraph.graph import StateGraph, START, END
+from langgraph.checkpoint.memory import MemorySaver
+from llmops.agents.state import AgentState, convert_numpy_types
+from llmops.agents.planning_agent import planning_node
+from llmops.agents.outline_agent import outline_node
+from llmops.agents.metrics_agent import metrics_node
+
+
+def create_report_generation_graph():
+    """构建报告生成图"""
+
+    workflow = StateGraph(AgentState)
+
+    # 添加节点
+    workflow.add_node("planning_node", planning_node)
+    workflow.add_node("outline_generator", outline_node)
+    workflow.add_node("metrics_calculator", metrics_node)
+    workflow.add_node("report_compiler", compile_final_report)
+    workflow.add_node("clarify_node", handle_clarification)
+
+    # 设置入口
+    workflow.add_edge(START, "planning_node")
+
+    # 条件边:根据规划节点返回的状态路由
+    workflow.add_conditional_edges(
+        "planning_node",
+        route_from_planning,
+        {
+            "outline_generator": "outline_generator",
+            "metrics_calculator": "metrics_calculator",
+            "report_compiler": "report_compiler",
+            "clarify_node": "clarify_node",
+            "planning_node": "planning_node",  # 继续循环
+            "END": END
+        }
+    )
+
+    # 返回规划节点重新决策
+    workflow.add_edge("outline_generator", "planning_node")
+    workflow.add_edge("metrics_calculator", "planning_node")
+    workflow.add_edge("clarify_node", "planning_node")
+
+    # 报告编译后结束
+    workflow.add_edge("report_compiler", END)
+
+    # 编译图
+    return workflow.compile(
+        checkpointer=MemorySaver(),
+        interrupt_before=[],
+        interrupt_after=[]
+    )
+
+
+def route_from_planning(state: AgentState) -> str:
+    """
+    从规划节点路由到下一个节点
+    返回目标节点名称
+    """
+    print(f"\n🔍 [路由决策] 步骤={state['planning_step']}, "
+          f"大纲版本={state['outline_version']}, "
+          f"大纲已生成={state.get('outline_draft') is not None}, "
+          f"指标需求={len(state.get('metrics_requirements', []))}, "
+          f"已计算={len(state.get('computed_metrics', {}))}")
+
+    # 新增:防止无限循环
+    if state['planning_step'] > 50:
+        print("⚠️ 规划步骤超过50次,强制终止并生成报告")
+        return "report_compiler"
+
+    # 如果大纲为空 → 生成大纲
+    if not state.get("outline_draft"):
+        print("→ 路由到 outline_generator(大纲为空)")
+        return "outline_generator"
+
+    # 如果指标需求为空 → 重新生成大纲
+    if not state.get("metrics_requirements"):
+        print("→ 路由到 outline_generator(指标需求为空)")
+        return "outline_generator"
+
+    # 计算覆盖率
+    required = len(state["metrics_requirements"])
+    computed = len(state["computed_metrics"])
+    coverage = computed / required if required > 0 else 0
+
+    print(f"  指标覆盖率 = {computed}/{required} = {coverage:.2%}")
+
+    # 新增:如果规划步骤过多且覆盖率超过50%,强制生成报告
+    if state['planning_step'] > 30 and coverage > 0.5:
+        print(f"→ 路由到 report_compiler(步骤过多,强制终止,覆盖率={coverage:.2%})")
+        return "report_compiler"
+
+    # 如果覆盖率 < 80% → 计算指标
+    if coverage < 0.8:
+        print(f"→ 路由到 metrics_calculator(覆盖率={coverage:.2%} < 80%)")
+        return "metrics_calculator"
+
+    # 如果覆盖率 ≥ 80% → 生成报告
+    print(f"→ 路由到 report_compiler(覆盖率={coverage:.2%} ≥ 80%)")
+    return "report_compiler"
+
+
+def compile_final_report(state: AgentState) -> AgentState:
+    """报告编译节点:整合所有结果"""
+
+    # 关键修复:将Pydantic模型转换为字典
+    outline = state["outline_draft"]
+    if hasattr(outline, 'dict'):
+        outline_dict = outline.dict()
+    else:
+        outline_dict = outline
+
+    metrics = state["computed_metrics"]
+
+    # 按章节组织内容
+    sections = []
+    for section in outline_dict["sections"]:
+        section_metrics = {
+            mid: metrics.get(mid, "数据缺失")
+            for mid in section["metrics_needed"]
+        }
+        sections.append({
+            "title": section["title"],
+            "description": section["description"],
+            "metrics": section_metrics
+        })
+
+    final_report = {
+        "title": outline_dict["report_title"],
+        "sections": sections,
+        "summary": {
+            "total_metrics": len(metrics),
+            "required_metrics": len(outline_dict["global_metrics"]),
+            "coverage_rate": float(state["completeness_score"]),
+            "planning_iterations": int(state["planning_step"])
+        }
+    }
+
+    result_state = {
+        **state,
+        "answer": final_report,
+        "status": "success",
+        "messages": state["messages"] + [("ai", f"🎉 报告生成完成:{outline_dict['report_title']}")]
+    }
+
+    # 关键修复:返回前清理状态
+    return convert_numpy_types(result_state)
+
+
+def handle_clarification(state: AgentState) -> AgentState:
+    """澄清处理节点"""
+    result_state = {
+        **state,
+        "status": "clarifying",
+        "is_complete": True,
+        "answer": "需要更多信息,请明确您的报告需求"
+    }
+
+    # 关键修复:返回前清理状态
+    return convert_numpy_types(result_state)

+ 309 - 0
llmops/agents/main.py

@@ -0,0 +1,309 @@
+import asyncio
+import json
+import os
+import sys
+from typing import List, Dict, Any
+from llmops.agents.graph import create_report_generation_graph
+from llmops.agents.data_manager import DataManager
+from llmops.agents.state import create_initial_state
+
+# ============= 数据加载方法 =============
+
+def load_sample_data() -> List[Dict[str, Any]]:
+    """加载符合完整字段格式的示例数据"""
+    return [
+        {
+            "txId": "TX202512310001",
+            "txDate": "2025-12-31",
+            "txTime": "13:55",
+            "txAmount": 3100,
+            "txBalance": 245600,
+            "txDirection": "收入",
+            "txSummary": "中药材服务费",
+            "txCounterparty": "康恩贝药业",
+            "createdAt": "2025-11-30 05:57"
+        },
+        {
+            "txId": "TX202512310002",
+            "txDate": "2025-12-30",
+            "txTime": "09:20",
+            "txAmount": 15000,
+            "txBalance": 242500,
+            "txDirection": "收入",
+            "txSummary": "货款结算",
+            "txCounterparty": "同仁堂医药",
+            "createdAt": "2025-11-29 18:23"
+        },
+        # ... 更多示例数据
+    ]
+
+
+def load_data_from_file(file_path: str) -> List[Dict[str, Any]]:
+    """
+    从JSON文件加载交易流水数据
+
+    参数:
+        file_path: JSON文件的完整路径
+
+    返回:
+        交易流水数据列表
+
+    异常:
+        FileNotFoundError: 文件不存在
+        ValueError: 数据格式错误或不符合要求
+    """
+    try:
+        print(f"📁 加载数据文件: {file_path}")
+        data, df = DataManager.load_from_file(file_path)
+
+        # 验证数据质量
+        is_valid, errors = DataManager.validate_data_schema(data)
+        if not is_valid:
+            raise ValueError(f"数据验证失败: {'; '.join(errors)}")
+
+        # 打印数据摘要
+        print(DataManager.format_data_summary(data))
+
+        return data
+
+    except Exception as e:
+        raise ValueError(f"数据加载失败: {e}")
+
+
+def create_sample_file():
+    """创建符合新格式的示例数据文件"""
+    sample_file = "sample_transactions.json"
+    sample_data = load_sample_data()
+
+    with open(sample_file, 'w', encoding='utf-8') as f:
+        json.dump(sample_data, f, ensure_ascii=False, indent=2)
+
+    print(f"✅ 示例文件已创建: {sample_file}")
+    print(f"📍 文件位置: {os.path.abspath(sample_file)}")
+
+
+async def run_report_generation(
+        requirement: str,
+        data: List[Dict[str, Any]],
+        session_id: str = None
+) -> Dict[str, Any]:
+    """运行报告生成流程"""
+
+    # 验证和格式化数据
+    print(f"\n🔍 验证数据格式...")
+    is_valid, errors = DataManager.validate_data_schema(data)
+    if not is_valid:
+        raise ValueError(f"数据验证失败: {'; '.join(errors)}")
+
+    # 1. 初始化状态 - 关键修复:使用create_initial_state
+    if session_id is None:
+        session_id = f"report_session_{hash(requirement) % 10000}"
+
+    # 关键修复:使用清理后的初始状态
+    initial_state = create_initial_state(
+        question=requirement,
+        data=data,
+        session_id=session_id
+    )
+
+    # 2. 创建图
+    graph = create_report_generation_graph()
+
+    # 3. 配置
+    config = {
+        "configurable": {"thread_id": initial_state["session_id"]},
+        "recursion_limit": 50  # 增加到50
+    }
+
+    print(f"\n🚀 开始报告生成流程 (会话: {session_id})...\n")
+
+    # 4. 执行并流式打印
+    step = 1
+    try:
+        async for event in graph.astream(initial_state, config, stream_mode="updates"):
+            node_name = list(event.keys())[0]
+            node_output = event[node_name]
+
+            if "messages" in node_output and node_output["messages"]:
+                message = node_output["messages"][-1]
+                if isinstance(message, tuple):
+                    role, content = message
+                    print(f"📍 Step {step:02d} | {node_name}: {content}")
+
+                    # 添加状态跟踪
+                    if "metrics_requirements" in node_output:
+                        reqs = node_output["metrics_requirements"]
+                        print(f"   📊 指标需求: {[m.metric_id for m in reqs]}")
+
+                    if "computed_metrics" in node_output:
+                        computed = node_output["computed_metrics"]
+                        print(f"   ✅ 已计算: {list(computed.keys())}")
+
+                    if "pending_metric_ids" in node_output:
+                        pending = node_output["pending_metric_ids"]
+                        print(f"   ⏳ 待计算: {pending}")
+                elif isinstance(message, str):
+                    print(f"📍 Step {step:02d} | {node_name}: {message}")
+
+            step += 1
+
+            # 安全检查:防止无限循环
+            if step > 100:
+                print("⚠️ 达到最大步数限制(100),强制终止")
+                break
+
+    except Exception as e:
+        print(f"❌ 图执行异常: {e}")
+        import traceback
+        traceback.print_exc()
+        raise
+
+    # 5. 等待并获取最终状态
+    await asyncio.sleep(0.5)
+    final_state = await graph.aget_state(config)
+
+    # 6. 调试信息和错误处理
+    print(f"\nDebug: 最终状态键 = {list(final_state.values.keys())}")
+    print(f"Debug: is_complete = {final_state.values.get('is_complete')}")
+    print(f"Debug: 执行步数 = {step - 1}")
+
+    # 检查 answer 是否存在
+    if "answer" not in final_state.values:
+        print(f"❌ 错误:最终状态中缺少 'answer' 键")
+        print(f"大纲状态: {final_state.values.get('outline_draft')}")
+        print(f"最后路由: {final_state.values.get('next_route')}")
+        raise ValueError(
+            f"报告生成未完成:缺少 'answer' 键\n"
+            f"最后路由: {final_state.values.get('next_route')}\n"
+            f"大纲草稿: {final_state.values.get('outline_draft') is not None}\n"
+            f"已计算指标: {len(final_state.values.get('computed_metrics', {}))}"
+        )
+
+    report = final_state.values["answer"]
+
+    # 7. 验证报告质量
+    if isinstance(report, dict) and "error" in report:
+        raise ValueError(f"报告生成出错: {report['error']}")
+
+    return report
+
+
+def print_usage():
+    """打印使用说明"""
+    print("""
+使用说明:
+  python main.py              # 运行主程序(需配置文件路径)
+  python main.py --create-sample  # 创建示例数据文件
+  python main.py --help       # 显示帮助信息
+  python main.py --validate file.json  # 验证数据文件格式
+
+数据文件要求:
+  JSON数组,每条记录包含以下字段:
+  - txId: 交易ID (字符串)
+  - txDate: 交易日期 (YYYY-MM-DD)
+  - txTime: 交易时间 (HH:MM)
+  - txAmount: 交易金额 (数值)
+  - txBalance: 交易后余额 (数值)
+  - txDirection: 交易方向 (收入/支出)
+  - txSummary: 交易摘要 (字符串)
+  - txCounterparty: 交易对手 (字符串)
+  - createdAt: 记录创建时间 (YYYY-MM-DD HH:MM:SS)
+    """)
+
+
+def validate_file(file_path: str):
+    """验证数据文件"""
+    try:
+        print(f"🔍 验证文件: {file_path}")
+        data = load_data_from_file(file_path)
+        print("✅ 文件验证通过!")
+        return True
+    except Exception as e:
+        print(f"❌ 验证失败: {e}")
+        return False
+
+
+def handle_command_line():
+    """处理命令行参数"""
+    import sys
+
+    if len(sys.argv) == 1:
+        asyncio.run(main())
+    elif sys.argv[1] == "--create-sample":
+        create_sample_file()
+    elif sys.argv[1] == "--help":
+        print_usage()
+    elif sys.argv[1] == "--validate" and len(sys.argv) > 2:
+        validate_file(sys.argv[2])
+    else:
+        print("未知参数,使用 --help 查看帮助")
+
+
+async def main():
+    """主函数:展示数据加载和报告生成"""
+
+    print("=" * 70)
+    print("  交易流水报告生成系统 (支持完整字段格式)")
+    print("=" * 70)
+
+    # ===== 配置区 =====
+    use_sample_data = False  # 设为 True 使用示例数据,False 使用真实文件
+
+    if use_sample_data:
+        print("\n📊 使用内置示例数据...")
+        data = load_sample_data()
+        requirement = "生成2025年12月交易分析报告,重点关注收入支出分布、主要交易对手、余额变化趋势和大额交易"
+
+    else:
+        # 修改为你的文件路径
+        file_path = "/Applications/work/宇信科技/知识沉淀平台/原始数据-流水分析-农业原始数据.json"
+
+        # 如果文件不存在,创建示例文件
+        if not os.path.exists(file_path):
+            print(f"\n⚠️ 文件不存在: {file_path}")
+            response = input("是否创建示例文件? (y/n): ")
+            if response.lower() == 'y':
+                create_sample_file()
+                file_path = "sample_transactions.json"
+            else:
+                print("\n💡 提示: 请将你的JSON文件路径赋值给 file_path 变量")
+                return
+
+        try:
+            print(f"\n📁 从文件加载数据: {file_path}")
+            data = load_data_from_file(file_path)
+            requirement = input("\n请输入报告需求: ") or \
+                          "生成交易分析报告,包含收入趋势、主要客户、大额交易和余额分析"
+
+        except Exception as e:
+            print(f"\n❌ 数据加载失败: {e}")
+            return
+
+    # 运行报告生成
+    try:
+        report = await run_report_generation(
+            requirement=requirement,
+            data=data,
+            session_id="demo_session_001"
+        )
+
+        # 打印报告摘要
+        print("\n" + "=" * 70)
+        print("📊 最终生成报告")
+        print("=" * 70)
+        print(json.dumps(report, ensure_ascii=False, indent=2))
+
+        # 保存报告
+        output_file = "generated_report.json"
+        with open(output_file, 'w', encoding='utf-8') as f:
+            json.dump(report, f, ensure_ascii=False, indent=2)
+        print(f"\n💾 报告已保存到: {os.path.abspath(output_file)}")
+
+    except Exception as e:
+        print(f"\n❌ 报告生成失败: {e}")
+        import traceback
+        traceback.print_exc()
+
+
+if __name__ == "__main__":
+    handle_command_line()

+ 369 - 0
llmops/agents/metrics_agent.py

@@ -0,0 +1,369 @@
+from typing import Dict, Any, List, Optional
+from langchain_openai import ChatOpenAI
+from pydantic import BaseModel, Field
+import pandas as pd
+import asyncio
+import json
+import traceback
+import re
+import types
+
+from llmops.agents.state import AgentState, MetricRequirement, convert_numpy_types
+from llmops.agents.datadev.llm import get_llm
+
+
+class MetricComputationResult(BaseModel):
+    """指标计算结果"""
+    metric_id: str
+    value: Any
+    status: str
+    computation_code: str = ""
+    error: Optional[str] = None
+
+
+class MetricsCalculator:
+    """指标计算智能体:安全执行指标计算"""
+
+    def __init__(self, llm):
+        self.llm = llm
+
+    def _extract_json_from_response(self, content: str) -> dict:
+        """从LLM响应中提取JSON,支持Markdown代码块格式"""
+        content = content.strip()
+
+        # 尝试匹配 ```json ... ``` 格式
+        json_match = re.search(r'```json\s*(\{.*?\})\s*```', content, re.DOTALL)
+        if json_match:
+            return json.loads(json_match.group(1))
+
+        # 尝试匹配 ``` ... ``` 格式
+        code_match = re.search(r'```\s*(\{.*?\})\s*```', content, re.DOTALL)
+        if code_match:
+            return json.loads(code_match.group(1))
+
+        # 尝试直接解析(兼容纯JSON格式)
+        try:
+            return json.loads(content)
+        except json.JSONDecodeError:
+            pass
+
+        # 尝试提取第一个 { 和最后一个 } 之间的内容
+        first_brace = content.find('{')
+        last_brace = content.rfind('}')
+        if first_brace != -1 and last_brace != -1 and last_brace > first_brace:
+            try:
+                return json.loads(content[first_brace:last_brace + 1])
+            except json.JSONDecodeError:
+                pass
+
+        raise ValueError(f"无法从响应中提取有效JSON: {content[:200]}...")
+
+    def _execute_code_safely(self, code: str, df: pd.DataFrame) -> Any:
+        """
+        核心修复:使用types.FunctionType创建真正的函数作用域
+        解决列表推导式中变量不可见的问题
+        """
+        # 准备执行环境
+        globals_dict = {
+            "pd": pd,
+            "df": df.copy(),
+            "__builtins__": {
+                "abs": abs, "sum": sum, "len": len, "str": str, "int": int, "float": float,
+                "list": list, "dict": dict, "set": set, "tuple": tuple,
+                "min": min, "max": max, "round": round, "range": range,
+                "sorted": sorted, "enumerate": enumerate, "zip": zip,
+            }
+        }
+
+        # 移除可能存在的危险内置函数
+        safe_globals = globals_dict.copy()
+
+        # 创建函数代码
+        func_code = f"""
+def _compute_metric():
+    # 用户生成的代码
+    {code}
+
+    # 确保result被定义
+    return result
+"""
+
+        try:
+            # 编译代码
+            compiled = compile(func_code, "<string>", "exec")
+
+            # 创建局部命名空间
+            local_ns = {}
+
+            # 执行编译后的代码
+            exec(compiled, safe_globals, local_ns)
+
+            # 获取函数并执行
+            compute_func = local_ns["_compute_metric"]
+
+            # 执行函数并返回结果
+            result = compute_func()
+
+            # 关键:立即转换numpy类型
+            return convert_numpy_types(result)
+
+        except Exception as e:
+            raise ValueError(f"代码执行失败: {str(e)}\n代码内容: {code}")
+
+    async def compute_metric(
+            self,
+            metric: MetricRequirement,
+            df: pd.DataFrame
+    ) -> MetricComputationResult:
+        """动态计算单个指标(最终修复版)"""
+
+        available_fields = df.columns.tolist()
+
+        # 内置常用指标模板,提高成功率
+        builtin_templates = {
+            "total_income": "df_filtered = df[df['txDirection']=='收入']; result = df_filtered['txAmount'].sum()",
+            "total_expense": "df_filtered = df[df['txDirection']=='支出']; result = abs(df_filtered['txAmount'].sum())",
+            "net_income": "income = df[df['txDirection']=='收入']['txAmount'].sum(); expense = df[df['txDirection']=='支出']['txAmount'].sum(); result = income - expense",
+            "net_profit": "income = df[df['txDirection']=='收入']['txAmount'].sum(); expense = df[df['txDirection']=='支出']['txAmount'].sum(); result = income - expense",
+            "balance_trend": "df['txDate'] = pd.to_datetime(df['txDate']); df_daily = df.groupby(df['txDate'].dt.date)['txBalance'].last().reset_index(); result = {'dates': df_daily['txDate'].astype(str).tolist(), 'balances': df_daily['txBalance'].tolist()}",
+            "top_income_sources": "df_income = df[df['txDirection']=='收入']; top_client = df_income.groupby('txCounterparty')['txAmount'].sum().idxmax(); result = {top_client: df_income.groupby('txCounterparty')['txAmount'].sum().max()}",
+            "top_expense_categories": "df_expense = df[df['txDirection']=='支出']; top_cat = df_expense.groupby('txSummary')['txAmount'].sum().idxmax(); result = {top_cat: df_expense.groupby('txSummary')['txAmount'].sum().max()}",
+            "income_sources_breakdown": "df_income = df[df['txDirection']=='收入']; result = df_income.groupby('txCounterparty')['txAmount'].sum().to_dict()",
+            "expense_categories_breakdown": "df_expense = df[df['txDirection']=='支出']; result = df_expense.groupby('txSummary')['txAmount'].sum().to_dict()",
+            "monthly_income_trend": "df['txDate'] = pd.to_datetime(df['txDate']); df_income = df[df['txDirection']=='收入']; df_income['month'] = df_income['txDate'].dt.to_period('M'); result = df_income.groupby('month')['txAmount'].sum().to_dict()",
+            "monthly_expense_trend": "df['txDate'] = pd.to_datetime(df['txDate']); df_expense = df[df['txDirection']=='支出']; df_expense['month'] = df_expense['txDate'].dt.to_period('M'); result = df_expense.groupby('month')['txAmount'].sum().to_dict()",
+        }
+
+        # 如果是指标ID在模板中,直接使用模板
+        if metric.metric_id in builtin_templates:
+            code = builtin_templates[metric.metric_id]
+            print(f"   📦 使用内置模板: {metric.metric_id}")
+        else:
+            # 否则调用LLM生成代码
+            prompt = f"""你是数据分析执行器。根据计算逻辑,生成并执行 Pandas/Python 代码。
+
+数据信息:
+- 字段: {available_fields}
+- 行数: {len(df)}
+- 指标名称: {metric.metric_name}
+- 指标ID: {metric.metric_id}
+- 计算逻辑: {metric.calculation_logic}
+
+要求:
+1. 生成可执行的 Pandas/Python 代码(只操作 df 变量)
+2. 代码必须安全,禁止系统调用
+3. **最终结果必须赋值给变量 `result`**(这是强制要求)
+4. `result` 可以是:数值、列表、字典、DataFrame
+5. 如果无法计算,将 error 字段设为具体原因
+
+**关键约束**:
+- 代码最后一行必须是 `result = ...`
+- 确保 `result` 变量在执行后被定义
+- 不要包含任何解释性文字,只输出JSON
+- **重要**:避免在列表推导式中引用外部变量,所有变量必须在同一作用域定义
+
+输出格式(必须返回纯JSON,不要Markdown代码块):
+{{
+  "metric_id": "{metric.metric_id}",
+  "value": <计算结果>,
+  "status": "success",
+  "computation_code": "df_filtered = df[df['txDirection']=='收入']; result = df_filtered['txAmount'].sum()"
+}}
+
+如果无法计算:
+{{
+  "metric_id": "{metric.metric_id}",
+  "value": null,
+  "status": "error",
+  "error": "具体错误原因",
+  "computation_code": ""
+}}
+
+**必须确保代码执行后,`result` 变量被定义!**"""
+
+            try:
+                # 调用 LLM
+                response = await self.llm.ainvoke(prompt)
+
+                print(f"\n   🤖 LLM 原始响应: {response.content[:200]}...")
+
+                # 使用增强的JSON解析
+                response_dict = self._extract_json_from_response(response.content)
+
+                # 提取代码
+                code = response_dict.get("computation_code", "")
+                print(f"   💻 生成代码: {code}")
+
+                if not code:
+                    # LLM无法生成代码,尝试使用简化模板
+                    if "收入" in metric.metric_name and "分类" in metric.metric_name:
+                        code = "df_income = df[df['txDirection']=='收入']; result = df_income.groupby('txSummary')['txAmount'].sum().to_dict()"
+                    elif "支出" in metric.metric_name and "分类" in metric.metric_name:
+                        code = "df_expense = df[df['txDirection']=='支出']; result = df_expense.groupby('txSummary')['txAmount'].sum().to_dict()"
+                    elif "收入" in metric.metric_name and ("top" in metric.metric_name or "主要" in metric.metric_name):
+                        code = "df_income = df[df['txDirection']=='收入']; result = df_income.groupby('txCounterparty')['txAmount'].sum().to_dict()"
+                    elif "支出" in metric.metric_name and ("top" in metric.metric_name or "主要" in metric.metric_name):
+                        code = "df_expense = df[df['txDirection']=='支出']; result = df_expense.groupby('txSummary')['txAmount'].sum().to_dict()"
+                    else:
+                        raise ValueError("LLM 未生成计算代码,且无法匹配内置模板")
+
+            except Exception as e:
+                # 如果LLM调用失败,使用最简单模板
+                print(f"   ⚠️ LLM调用失败,使用回退模板: {e}")
+                if "收入" in metric.metric_name:
+                    code = "df_income = df[df['txDirection']=='收入']; result = df_income['txAmount'].sum()"
+                elif "支出" in metric.metric_name:
+                    code = "df_expense = df[df['txDirection']=='支出']; result = abs(df_expense['txAmount'].sum())"
+                else:
+                    code = "result = None"
+
+        try:
+            # 使用重构的执行器
+            computed_value = self._execute_code_safely(code, df)
+            print(f"   ✅ 计算结果: {computed_value}")
+
+            return MetricComputationResult(
+                metric_id=metric.metric_id,
+                value=computed_value,
+                status="success",
+                computation_code=code
+            )
+
+        except Exception as e:
+            # 捕获所有异常并记录详细信息
+            error_msg = f"指标 {metric.metric_id} 计算失败: {str(e)}"
+            print(f"   ❌ {error_msg}")
+
+            return MetricComputationResult(
+                metric_id=metric.metric_id,
+                value=None,
+                status="failed",
+                error=error_msg,
+                computation_code=code
+            )
+
+    async def compute_batch(
+            self,
+            metrics: List[MetricRequirement],
+            df: pd.DataFrame,
+            max_concurrent: int = 3
+    ) -> List[MetricComputationResult]:
+        """批量计算指标(限制并发)"""
+        semaphore = asyncio.Semaphore(max_concurrent)
+
+        async def compute_with_semaphore(metric):
+            async with semaphore:
+                return await self.compute_metric(metric, df)
+
+        tasks = [compute_with_semaphore(m) for m in metrics]
+        return await asyncio.gather(*tasks)
+
+
+async def metrics_node(state: AgentState) -> AgentState:
+    """指标计算节点:正确提取待计算指标并增强调试"""
+
+    if not state["data_set"]:
+        new_state = state.copy()
+        new_state["messages"].append(("ai", "❌ 错误:数据集为空"))
+        # 关键修复:返回前清理状态
+        return convert_numpy_types(new_state)
+
+    df = pd.DataFrame(state["data_set"])
+
+    # 提取待计算指标
+    pending_ids = state.get("pending_metric_ids", [])
+    computed_ids = set(state["computed_metrics"].keys())
+    required_metrics = state["metrics_requirements"]
+
+    pending_metrics = [
+        m for m in required_metrics
+        if m.metric_id not in computed_ids
+    ]
+
+    if pending_ids:
+        valid_ids = [m.metric_id for m in pending_metrics]
+        pending_metrics = [
+            m for m in pending_metrics
+            if m.metric_id in pending_ids and m.metric_id in valid_ids
+        ]
+
+    if not pending_metrics:
+        coverage = len(computed_ids) / len(required_metrics) if required_metrics else 1
+
+        if coverage >= 1.0:
+            new_state = state.copy()
+            new_state["messages"].append(("ai", "✅ 所有指标已计算完成"))
+            new_state["completeness_score"] = 1.0
+            new_state["is_complete"] = True
+            new_state["next_route"] = "report_compiler"
+            # 关键修复:返回前清理状态
+            return convert_numpy_types(new_state)
+
+        # 重新计算
+        pending_metrics = [
+            m for m in required_metrics
+            if m.metric_id not in computed_ids
+        ]
+
+        if not pending_metrics:
+            raise ValueError("无法提取待计算指标")
+
+    print(f"\n📊 待计算指标: {[m.metric_id for m in pending_metrics]}")
+
+    # 限制单次计算数量
+    batch_size = 5
+    metrics_to_compute = pending_metrics[:batch_size]
+
+    print(f"🧮 本次计算 {len(metrics_to_compute)} 个指标")
+
+    # 批量计算
+    llm = get_llm()
+    calculator = MetricsCalculator(llm)
+
+    results = await calculator.compute_batch(metrics_to_compute, df)
+
+    # 更新状态
+    new_state = state.copy()
+    success_count = 0
+
+    for r in results:
+        if r.status == "success":
+            new_state["computed_metrics"][r.metric_id] = r.value
+            success_count += 1
+        else:
+            print(f"⚠️ 指标 {r.metric_id} 失败: {r.error[:100]}...")
+            new_state["messages"].append(
+                ("ai", f"⚠️ 指标计算失败 {r.metric_id}: {r.error[:100]}...")
+            )
+            # 记录失败次数
+            if "failed_metric_attempts" not in new_state:
+                new_state["failed_metric_attempts"] = {}
+            new_state["failed_metric_attempts"][r.metric_id] = new_state.get("failed_metric_attempts", {}).get(
+                r.metric_id, 0) + 1
+
+    # 更新待计算指标列表
+    remaining_ids = [m.metric_id for m in pending_metrics[success_count:]]
+    new_state["pending_metric_ids"] = remaining_ids
+
+    # 新增:记录失败指标,避免无限重试
+    if success_count == 0 and len(metrics_to_compute) > 0:
+        # 如果全部失败,标记为跳过
+        new_state["messages"].append(
+            ("ai", f"⚠️ {len(metrics_to_compute)}个指标全部计算失败,将跳过这些指标")
+        )
+        # 从待计算列表中移除这些失败的指标
+        failed_ids = [m.metric_id for m in metrics_to_compute]
+        new_state["pending_metric_ids"] = [mid for mid in remaining_ids if mid not in failed_ids]
+
+    print(f"✅ 成功 {success_count}/{len(results)},剩余 {len(new_state['pending_metric_ids'])} 个指标")
+
+    # 重新评估覆盖率
+    coverage = len(new_state["computed_metrics"]) / len(required_metrics)
+    new_state["completeness_score"] = coverage
+
+    new_state["messages"].append(
+        ("ai", f"🧮 计算完成 {success_count}/{len(results)} 个指标,覆盖率 {coverage:.2%}")
+    )
+
+    # 关键修复:返回前清理状态
+    return convert_numpy_types(new_state)

+ 234 - 0
llmops/agents/outline_agent.py

@@ -0,0 +1,234 @@
+from typing import List, Dict, Any
+from langchain_openai import ChatOpenAI
+from langchain_core.prompts import ChatPromptTemplate
+import json  # 确保导入json
+import uuid
+
+from llmops.agents.state import AgentState, ReportOutline, ReportSection, MetricRequirement, convert_numpy_types
+from llmops.agents.datadev.llm import get_llm
+
+
+class OutlineGenerator:
+    """大纲生成智能体:将报告需求转化为结构化大纲"""
+
+    def __init__(self, llm):
+        self.llm = llm.with_structured_output(ReportOutline)
+
+    def create_prompt(self, question: str, sample_data: List[Dict]) -> str:
+        """创建大纲生成提示"""
+
+        available_fields = list(sample_data[0].keys()) if sample_data else []
+        sample_str = json.dumps(sample_data[:2], ensure_ascii=False, indent=2)
+
+        # 关键修复:提供详细的字段说明和示例
+        return f"""你是银行流水报告大纲专家。根据用户需求和样本数据,生成专业、可执行的报告大纲。
+
+需求分析:
+{question}
+
+可用字段:
+{', '.join(available_fields)}
+
+样本数据:
+{sample_str}
+
+输出要求(必须生成有效的JSON):
+1. report_title: 报告标题(字符串)
+2. sections: 章节列表,每个章节必须包含:
+   - section_id: 章节唯一ID(如"sec_1", "sec_2")
+   - title: 章节标题
+   - description: 章节描述
+   - metrics_needed: 所需指标ID列表(字符串数组,可为空)
+3. global_metrics: 全局指标列表,每个指标必须包含:
+   - metric_id: 指标唯一ID(如"total_income", "avg_balance")
+   - metric_name: 指标名称
+   - calculation_logic: 计算逻辑描述
+   - required_fields: 所需字段列表
+   - dependencies: 依赖的其他指标ID(可为空)
+
+重要提示:
+- 必须生成section_id,格式为"sec_1", "sec_2"等
+- 必须生成metric_id,格式为字母+下划线+描述
+- metrics_needed必须是字符串数组
+- 确保所有字段都存在,不能缺失
+
+输出示例:
+{{
+  "report_title": "2024年第三季度分析报告",
+  "sections": [
+    {{
+      "section_id": "sec_1",
+      "title": "收入概览",
+      "description": "分析收入总额",
+      "metrics_needed": ["total_income", "avg_income"]
+    }}
+  ],
+  "global_metrics": [
+    {{
+      "metric_id": "total_income",
+      "metric_name": "总收入",
+      "calculation_logic": "sum of all income transactions",
+      "required_fields": ["txAmount", "txDirection"],
+      "dependencies": []
+    }}
+  ]
+}}"""
+
+    async def generate(self, state: AgentState) -> ReportOutline:
+        """异步生成大纲(修复版:自动补全缺失字段)"""
+        prompt = self.create_prompt(
+            question=state["question"],
+            sample_data=state["data_set"][:2]
+        )
+
+        messages = [
+            ("system", "你是一名专业的报告大纲生成专家,必须输出完整、有效的JSON格式,包含所有必需字段。"),
+            ("user", prompt)
+        ]
+
+        outline = await self.llm.ainvoke(messages)
+
+        # 关键修复:后处理,补全缺失的section_id和metric_id
+        outline = self._post_process_outline(outline)
+
+        return outline
+
+    def _post_process_outline(self, outline: ReportOutline) -> ReportOutline:
+        """
+        后处理大纲,自动补全缺失的必需字段
+        """
+        # 为章节补全section_id
+        for idx, section in enumerate(outline.sections):
+            if not section.section_id:
+                section.section_id = f"sec_{idx + 1}"
+
+            # 确保metrics_needed是列表
+            if not isinstance(section.metrics_needed, list):
+                section.metrics_needed = []
+
+        # 为指标补全metric_id和dependencies
+        for idx, metric in enumerate(outline.global_metrics):
+            if not metric.metric_id:
+                metric.metric_id = f"metric_{idx + 1}"
+
+            # 确保dependencies是列表
+            if not isinstance(metric.dependencies, list):
+                metric.dependencies = []
+
+            # 推断required_fields(如果为空)
+            if not metric.required_fields:
+                metric.required_fields = self._infer_required_fields(
+                    metric.calculation_logic
+                )
+
+        return outline
+
+    def _infer_required_fields(self, logic: str) -> List[str]:
+        """从计算逻辑推断所需字段"""
+        field_mapping = {
+            "收入": ["txAmount", "txDirection"],
+            "支出": ["txAmount", "txDirection"],
+            "余额": ["txBalance"],
+            "对手方": ["txCounterparty"],
+            "日期": ["txDate"],
+            "时间": ["txTime", "txDate"],
+            "摘要": ["txSummary"],
+            "创建时间": ["createdAt"]
+        }
+
+        fields = []
+        for keyword, field_list in field_mapping.items():
+            if keyword in logic:
+                fields.extend(field_list)
+
+        return list(set(fields))
+
+
+async def outline_node(state: AgentState) -> AgentState:
+    """大纲生成节点:设置成功标志,防止重复生成"""
+
+    llm = get_llm()
+    generator = OutlineGenerator(llm)
+
+    try:
+        # 异步生成大纲
+        outline = await generator.generate(state)
+
+        # 更新状态
+        new_state = state.copy()
+        new_state["outline_draft"] = outline
+        new_state["outline_version"] += 1
+
+        # 防护:设置成功标志
+        new_state["outline_ready"] = True  # 明确标志:大纲已就绪
+
+        new_state["metrics_requirements"] = outline.global_metrics
+        new_state["metrics_pending"] = outline.global_metrics.copy()  # 待计算指标
+        new_state["messages"].append(
+            ("ai", f"✅ 大纲生成完成 v{new_state['outline_version']}:{outline.report_title}")
+        )
+
+        print(f"\n📝 大纲已生成:{outline.report_title}")
+        print(f"   章节数:{len(outline.sections)}")
+        print(f"   指标数:{len(outline.global_metrics)}")
+
+        # 新增:详细打印大纲内容
+        print("\n" + "=" * 70)
+        print("📋 详细大纲内容")
+        print("=" * 70)
+        print(json.dumps(outline.dict(), ensure_ascii=False, indent=2))
+        print("=" * 70)
+
+        # 关键修复:返回前清理状态
+        return convert_numpy_types(new_state)
+
+    except Exception as e:
+        print(f"⚠️ 大纲生成出错: {e},使用默认结构")
+
+        # 创建默认大纲
+        default_outline = ReportOutline(
+            report_title="默认交易分析报告",
+            sections=[
+                ReportSection(
+                    section_id="sec_1",
+                    title="交易概览",
+                    description="基础交易情况分析",
+                    metrics_needed=["total_transactions", "total_income", "total_expense"]
+                )
+            ],
+            global_metrics=[
+                MetricRequirement(
+                    metric_id="total_transactions",
+                    metric_name="总交易笔数",
+                    calculation_logic="count all transactions",
+                    required_fields=["txId"],
+                    dependencies=[]
+                ),
+                MetricRequirement(
+                    metric_id="total_income",
+                    metric_name="总收入",
+                    calculation_logic="sum of income transactions",
+                    required_fields=["txAmount", "txDirection"],
+                    dependencies=[]
+                )
+            ]
+        )
+
+        new_state = state.copy()
+        new_state["outline_draft"] = default_outline
+        new_state["outline_version"] += 1
+        new_state["outline_ready"] = True  # 即使默认也标记为就绪
+        new_state["metrics_requirements"] = default_outline.global_metrics
+        new_state["messages"].append(
+            ("ai", f"⚠️ 使用默认大纲 v{new_state['outline_version']}")
+        )
+
+        # 新增:详细打印默认大纲内容
+        print("\n" + "=" * 70)
+        print("📋 默认大纲内容")
+        print("=" * 70)
+        print(json.dumps(default_outline.dict(), ensure_ascii=False, indent=2))
+        print("=" * 70)
+
+        # 关键修复:返回前清理状态
+        return convert_numpy_types(new_state)

+ 220 - 0
llmops/agents/planning_agent.py

@@ -0,0 +1,220 @@
+from typing import List, Dict, Optional, Any, Union
+from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
+from pydantic import BaseModel, Field
+from langchain_openai import ChatOpenAI
+import json
+
+from llmops.agents.state import AgentState, MetricRequirement, convert_numpy_types
+from llmops.agents.datadev.llm import get_llm
+
+
+class ActionItem(BaseModel):
+    """动作项定义"""
+    action: str = Field(description="动作名称")
+    parameters: Optional[Dict[str, Any]] = Field(default_factory=dict)
+
+
+class ClarificationRequest(BaseModel):
+    """澄清请求结构化格式"""
+    questions: List[str] = Field(description="需要澄清的问题列表")
+    missing_fields: List[str] = Field(default_factory=list, description="缺少的字段或信息")
+
+
+class PlanningOutput(BaseModel):
+    """规划决策输出 - 支持灵活格式"""
+    decision: str = Field(
+        description="决策类型: generate_outline, compute_metrics, finalize, clarify"
+    )
+    reasoning: str = Field(description="详细推理过程")
+    next_actions: List[Union[str, ActionItem]] = Field(
+        default_factory=list,
+        description="下一步动作列表"
+    )
+    # 关键修复:明确传递待计算指标ID列表
+    metrics_to_compute: List[str] = Field(
+        default_factory=list,
+        description="待计算指标ID列表(如 ['total_income', 'avg_balance'])"
+    )
+    additional_requirements: Optional[
+        Union[Dict[str, Any], List[Any], ClarificationRequest]
+    ] = Field(default=None, description="额外需求或澄清信息")
+
+
+def normalize_additional_requirements(req: Any) -> Optional[Dict[str, Any]]:
+    """
+    规范化 additional_requirements
+    将列表转换为字典格式
+    """
+    if req is None:
+        return None
+
+    if isinstance(req, dict):
+        return req
+
+    if isinstance(req, list):
+        # 如果LLM错误地返回了列表,转换为字典格式
+        return {
+            "questions": [str(item) for item in req],
+            "missing_fields": []
+        }
+
+    return {"raw": str(req)}
+
+
+def create_planning_agent(llm, state: AgentState):
+    """创建规划智能体(修复版:移除JSON示例,避免变量冲突)"""
+    prompt = ChatPromptTemplate.from_messages([
+        ("system", """你是报告规划总控智能体,核心职责是精准分析当前状态并决定下一步行动。
+
+### 决策选项(四选一)
+1. generate_outline:大纲未生成或大纲无效
+2. compute_metrics:大纲已生成但指标未完成(覆盖率<80%)
+3. finalize:指标覆盖率≥80%,信息充足
+4. clarify:用户需求模糊,缺少关键信息
+
+### 决策规则(按顺序检查)
+1. 检查 outline_draft 是否为空 → 空则选择 generate_outline
+2. 检查 metrics_requirements 是否为空 → 空则选择 generate_outline
+3. 计算指标覆盖率 = 已计算指标 / 总需求指标
+   - 覆盖率 < 0.8 → 选择 compute_metrics
+   - 覆盖率 ≥ 0.8 → 选择 finalize
+4. 如果无法理解需求 → 选择 clarify
+
+### 重要原则
+- 大纲草稿已存在时,不要重复生成大纲
+- 决策为 compute_metrics 时,必须提供具体的指标ID列表
+- 确保 metrics_to_compute 是字符串数组格式
+
+### 输出字段说明
+- decision: 决策字符串
+- reasoning: 决策原因说明
+- next_actions: 动作列表(可选)
+- metrics_to_compute: 待计算指标ID列表(决策为compute_metrics时必须提供)
+- additional_requirements: 额外需求(可选)
+
+必须输出有效的JSON格式!"""),
+
+    MessagesPlaceholder("messages"),
+
+    ("user", "报告需求:{question}\n\n请输出决策结果。")
+    ])
+
+    return prompt | llm.with_structured_output(PlanningOutput)
+
+
+async def planning_node(state: AgentState) -> AgentState:
+    """规划节点:正确识别待计算指标并传递"""
+    llm = get_llm()
+    planner = create_planning_agent(llm, state)
+
+    # 构建完整的状态评估上下文
+    required_count = len(state["metrics_requirements"])
+    computed_count = len(state["computed_metrics"])
+    coverage = computed_count / required_count if required_count > 0 else 0
+
+    # 新增:跟踪失败次数,避免无限循环
+    failed_attempts = state.get("failed_metric_attempts", {})
+    pending_ids = state.get("pending_metric_ids", [])
+
+    # 过滤掉失败次数过多的指标
+    max_retry = 3
+    filtered_pending_ids = [
+        mid for mid in pending_ids
+        if failed_attempts.get(mid, 0) < max_retry
+    ]
+
+    status_snapshot = f"""当前状态评估:
+- 规划步骤: {state['planning_step']}
+- 大纲版本: {state['outline_version']}
+- 大纲草稿存在: {state['outline_draft'] is not None}
+- 指标需求总数: {required_count}
+- 已计算指标数: {computed_count}
+- 指标覆盖率: {coverage:.2%}
+- 待计算指标数: {len(pending_ids)}
+- 有效待计算指标数: {len(filtered_pending_ids)}
+- 失败尝试记录: {failed_attempts}
+
+建议下一步: {"计算指标" if coverage < 0.8 else "生成报告"}"""
+
+    # 执行规划
+    result = await planner.ainvoke({
+        "question": state["question"],
+        "messages": [("system", status_snapshot)]
+    })
+
+    # 规范化结果
+    normalized_req = normalize_additional_requirements(result.additional_requirements)
+
+    # 找出所有未计算的指标
+    computed_ids = set(state["computed_metrics"].keys())
+    required_metrics = state["metrics_requirements"]
+
+    pending_metrics = [
+        m for m in required_metrics
+        if m.metric_id not in computed_ids
+    ]
+
+    # 关键:使用 LLM 返回的指标ID,如果没有则使用全部待计算指标
+    if result.metrics_to_compute:
+        pending_ids = result.metrics_to_compute
+        valid_ids = [m.metric_id for m in pending_metrics]
+        pending_metrics = [m for m in pending_metrics if m.metric_id in pending_ids and m.metric_id in valid_ids]
+
+    # 更新状态
+    new_state = state.copy()
+    new_state["plan_history"].append(
+        f"Step {new_state['planning_step']}: {result.decision}"
+    )
+    new_state["planning_step"] += 1
+    new_state["additional_requirements"] = normalized_req
+
+    # 关键:保存待计算指标ID列表
+    if pending_metrics:
+        pending_ids = [m.metric_id for m in pending_metrics]
+        new_state["pending_metric_ids"] = pending_ids
+        new_state["metrics_to_compute"] = pending_metrics  # 保存完整对象
+
+    # 设置路由标志
+    if result.decision == "generate_outline":
+        new_state["messages"].append(
+            ("ai", f"📋 规划决策:生成大纲 (v{new_state['outline_version'] + 1})")
+        )
+        new_state["next_route"] = "outline_generator"
+    elif result.decision == "compute_metrics":
+        # 修复:确保显示正确的数量
+        if not pending_metrics:
+            # 如果没有待计算指标但有需求,则计算所有未完成的
+            computed_ids = set(state["computed_metrics"].keys())
+            pending_metrics = [m for m in required_metrics if m.metric_id not in computed_ids]
+
+        # 新增:如果有效待计算指标为空但还有指标未计算,说明都失败了太多次
+        if not filtered_pending_ids and pending_ids:
+            new_state["messages"].append(
+                ("ai", f"⚠️ 剩余 {len(pending_ids)} 个指标已多次计算失败,将跳过这些指标直接生成报告")
+            )
+            new_state["next_route"] = "report_compiler"
+            # 关键修复:返回前清理状态
+            return convert_numpy_types(new_state)
+
+        new_state["messages"].append(
+            ("ai", f"🧮 规划决策:计算 {len(pending_metrics)} 个指标 ({[m.metric_id for m in pending_metrics]})")
+        )
+        new_state["next_route"] = "metrics_calculator"
+    elif result.decision == "finalize":
+        new_state["is_complete"] = True
+        new_state["messages"].append(
+            ("ai", f"✅ 规划决策:信息充足,生成最终报告(覆盖率 {coverage:.2%})")
+        )
+        new_state["next_route"] = "report_compiler"
+    elif result.decision == "clarify":
+        questions = []
+        if normalized_req and "questions" in normalized_req:
+            questions = normalized_req["questions"]
+
+        new_state["messages"].append(
+            ("ai", f"❓ 需要澄清:{';'.join(questions) if questions else '请提供更详细的需求'}")
+        )
+        new_state["next_route"] = "clarify_node"
+
+    # 关键修复:返回前清理状态
+    return convert_numpy_types(new_state)

+ 127 - 0
llmops/agents/state.py

@@ -0,0 +1,127 @@
+from typing import TypedDict, List, Dict, Any, Optional
+from langchain_core.messages import BaseMessage
+from pydantic import BaseModel, Field
+import numpy as np
+
+
+# ============= 数据模型 =============
+
+class MetricRequirement(BaseModel):
+    """指标需求定义"""
+    metric_id: str = Field(description="指标唯一标识,如 'total_income_jan'")
+    metric_name: str = Field(description="指标中文名称")
+    calculation_logic: str = Field(description="计算逻辑描述")
+    required_fields: List[str] = Field(description="所需字段")
+    dependencies: List[str] = Field(default_factory=list)
+
+
+class ReportSection(BaseModel):
+    """报告大纲章节"""
+    section_id: str = Field(description="章节ID")
+    title: str = Field(description="章节标题")
+    description: str = Field(description="章节内容要求")
+    metrics_needed: List[str] = Field(description="所需指标ID列表")
+
+
+class ReportOutline(BaseModel):
+    """完整报告大纲"""
+    report_title: str
+    sections: List[ReportSection]
+    global_metrics: List[MetricRequirement]
+
+
+# ============= 序列化工具函数 =============
+
+def convert_numpy_types(obj: Any) -> Any:
+    """
+    递归转换所有numpy类型为Python原生类型
+    关键修复:确保所有数据可序列化
+    """
+    if isinstance(obj, dict):
+        return {str(k): convert_numpy_types(v) for k, v in obj.items()}
+    elif isinstance(obj, list):
+        return [convert_numpy_types(item) for item in obj]
+    elif isinstance(obj, tuple):
+        return tuple(convert_numpy_types(item) for item in obj)
+    elif isinstance(obj, set):
+        return {convert_numpy_types(item) for item in obj}
+    elif isinstance(obj, np.integer):
+        return int(obj)
+    elif isinstance(obj, np.floating):
+        return float(obj)
+    elif isinstance(obj, np.bool_):
+        return bool(obj)
+    elif isinstance(obj, np.ndarray):
+        return convert_numpy_types(obj.tolist())
+    elif hasattr(obj, 'item') and hasattr(obj, 'dtype'):  # numpy scalar
+        return convert_numpy_types(obj.item())
+    else:
+        return obj
+
+
+def create_initial_state(question: str, data: List[Dict[str, Any]], session_id: str = None) -> Dict[str, Any]:
+    """创建初始状态,确保所有数据已清理"""
+    cleaned_data = convert_numpy_types(data)
+
+    return {
+        "question": str(question),
+        "data_set": cleaned_data,
+        "transactions_df": None,
+        "planning_step": 0,
+        "plan_history": [],
+        "outline_draft": None,
+        "outline_version": 0,
+        "metrics_requirements": [],
+        "computed_metrics": {},
+        "metrics_cache": {},
+        "pending_metric_ids": [],
+        "failed_metric_attempts": {},
+        "report_draft": {},
+        "is_complete": False,
+        "completeness_score": 0.0,
+        "messages": [],
+        "current_node": "start",
+        "session_id": str(session_id) if session_id else "default_session",
+        "next_route": "planning_node",
+        "outline_ready": False,
+        "metrics_ready": False,
+        "last_decision": "init"
+    }
+
+
+# ============= 状态定义 =============
+
+class AgentState(TypedDict):
+    # === 输入层 ===
+    question: str
+    data_set: List[Dict[str, Any]]
+    transactions_df: Optional[Any]
+
+    # === 规划层 ===
+    planning_step: int
+    plan_history: List[str]
+
+    # === 大纲层 ===
+    outline_draft: Optional[ReportOutline]
+    outline_version: int
+
+    # === 指标层 ===
+    metrics_requirements: List[MetricRequirement]
+    computed_metrics: Dict[str, Any]
+    metrics_cache: Dict[str, Any]
+    pending_metric_ids: List[str]
+    failed_metric_attempts: Dict[str, int]
+
+    # === 结果层 ===
+    report_draft: Dict[str, Any]
+    is_complete: bool
+    completeness_score: float
+
+    # === 对话历史 ===
+    messages: List[BaseMessage]
+    current_node: str
+    session_id: str
+    next_route: str
+    outline_ready: bool
+    metrics_ready: bool
+    last_decision: str

+ 0 - 0
llmops/agents/tools/__init__.py