Sfoglia il codice sorgente

Merge remote-tracking branch 'origin/master'

# Conflicts:
#	llmops/agents/outline_agent.py
#	llmops/agents/planning_agent.py
JiaQiang 1 settimana fa
parent
commit
740c3f723d

+ 2 - 0
.idea/modules.xml

@@ -2,6 +2,8 @@
 <project version="4">
   <component name="ProjectModuleManager">
     <modules>
+      <module fileurl="file://$PROJECT_DIR$/../aireport/.idea/aireport.iml" filepath="$PROJECT_DIR$/../aireport/.idea/aireport.iml" />
+      <module fileurl="file://$USER_HOME$/Downloads/big_agent/.idea/big_agent.iml" filepath="$USER_HOME$/Downloads/big_agent/.idea/big_agent.iml" />
       <module fileurl="file:///Applications/work/宇信科技/智能数据平台/llmops/.idea/llmops.iml" filepath="/Applications/work/宇信科技/智能数据平台/llmops/.idea/llmops.iml" />
       <module fileurl="file://$PROJECT_DIR$/.idea/tx_flow_analysis.iml" filepath="$PROJECT_DIR$/.idea/tx_flow_analysis.iml" />
     </modules>

+ 2 - 0
.idea/tx_flow_analysis.iml

@@ -5,5 +5,7 @@
     <orderEntry type="jdk" jdkName="Python 3.1 (2)" jdkType="Python SDK" />
     <orderEntry type="sourceFolder" forTests="false" />
     <orderEntry type="module" module-name="llmops" />
+    <orderEntry type="module" module-name="aireport" />
+    <orderEntry type="module" module-name="big_agent" />
   </component>
 </module>

+ 2 - 1
README

@@ -8,4 +8,5 @@
 pip install langchain
 pip install langchain_openai
 pip install fastapi
-pip install uvicorn
+pip install uvicorn
+pip install pdfplumber   (pdf交易流水单解析需要)

+ 596 - 0
llmops/agents/chat_bot.py

@@ -0,0 +1,596 @@
+
+from typing import TypedDict, Any, List, Dict
+
+from langgraph.graph import StateGraph, START, END
+from langchain_core.messages import AIMessage, HumanMessage, BaseMessage
+import pandas as pd
+import json
+
+from langgraph.prebuilt import create_react_agent
+from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
+from llmops.agents.datadev.memory.memory_saver_with_expiry2 import MemorySaverWithExpiry
+
+from llmops.agents.datadev.llm import get_llm
+from llmops.agents.tools.unerstand_dataset_tool import understand_dataset_structure
+
+import logging
+
+logger = logging.getLogger(__name__)
+logging.basicConfig(level=logging.DEBUG,
+                    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+                    datefmt='%Y-%m-%d %H:%M:%S')
+
+class AgentState(TypedDict):
+    """
+    ChatBot智能体状态
+    """
+    question: str         # 用户问题
+    data_set: List[Any]   # 数据集 [{}]
+    transactions_df: pd.DataFrame      # pd 数据集
+    intent_num: str  # 意图分类ID(1-8)
+    history_messages: List[BaseMessage] # 历史消息
+    bank_type: str   # 银行类型
+    transactions_df: pd.DataFrame      # 数据集
+    analysis_results: Dict[str, Any]   # 分析结果
+    current_step: str                  # 当前操作步骤
+    file_path: str                     # 数据集文件路径
+    file_hash: str                     # 数据集文件hash值
+    status: str     # 状态 success | error
+    answer: Any     # AI回答
+
+class TxFlowAnalysisAgent:
+    """交易流水分析智能体"""
+
+    def __init__(self):
+        # self.llm_orchestrator = LLMOrchestrator()
+        self.llm = get_llm()
+
+        # 会话记忆
+        self.memory = MemorySaverWithExpiry(expire_time=600, clean_interval=60)
+
+        # 构建图
+        self.graph = self._build_graph()
+
+
+    def _build_graph(self):
+        # 构造计算图
+        graph_builder = StateGraph(AgentState)
+
+        # 添加节点
+        graph_builder.add_node("_intent_recognition", self._intent_recognition)
+        graph_builder.add_node("_say_hello", self._say_hello)
+        graph_builder.add_node("_intent_clarify", self._intent_clarify)
+        graph_builder.add_node("_beyond_ability", self._beyond_ability)
+        graph_builder.add_node("_data_op", self._data_op)
+        graph_builder.add_node("_gen_report", self._gen_report)
+        graph_builder.add_node("_analysis", self._analysis)
+        graph_builder.add_node("_statistics", self._statistics)
+        graph_builder.add_node("_query", self._query)
+        graph_builder.add_node("_invalid", self._invalid)
+
+        # 添加边
+        graph_builder.add_edge(START, "_intent_recognition")
+        graph_builder.add_edge("_say_hello", END)
+        graph_builder.add_edge("_intent_clarify", END)
+        graph_builder.add_edge("_beyond_ability", END)
+        graph_builder.add_edge("_data_op", END)
+        graph_builder.add_edge("_gen_report", END)
+        graph_builder.add_edge("_analysis", END)
+        graph_builder.add_edge("_statistics", END)
+        graph_builder.add_edge("_query", END)
+        graph_builder.add_edge("_invalid", END)
+
+        # 条件边
+        graph_builder.add_conditional_edges(source="_intent_recognition", path=self._router, path_map={
+            "_say_hello": "_say_hello",
+            "_intent_clarify": "_intent_clarify",
+            "_beyond_ability": "_beyond_ability",
+            "_data_op": "_data_op",
+            "_gen_report": "_gen_report",
+            "_analysis": "_analysis",
+            "_statistics": "_statistics",
+            "_query": "_query",
+            "_invalid": "_invalid"
+        })
+
+        return graph_builder.compile(checkpointer=self.memory)
+
+
+    def _intent_recognition(self, state: AgentState):
+        """
+        根据用户问题,识别用户意图
+        :param state:
+        :return:
+        """
+        template = """
+            你是一位银行流水分析助手,专门识别用户问题的意图类别。
+
+            ### 任务说明
+            根据用户问题和历史对话,判断用户意图属于以下哪个类别。直接输出对应的数字编号。
+    
+            ### 历史对话:
+            {history_messages}
+
+            ### 当前用户问题:
+            {question}
+
+            ### 意图分类体系:
+            1. **流水查询与检索** - 查找特定交易记录
+               - 示例:"查一下昨天给张三的转账"、"找一笔5000元的交易"
+            2. **统计与汇总** - 计算数值、统计信息
+               - 示例:"本月总支出多少"、"工资收入总计多少"
+            3. **洞察与分析** - 分析模式、趋势、异常
+               - 示例:"我的消费趋势如何"、"有没有异常交易"
+            4. **生成报告**  - 生成一份分析报告
+               - 示例:"生成一份收入与支持的分析报告"
+            5. **数据操作与证明** - 验证交易
+               - 示例:"导出本月流水"、"查这笔交易的流水号"
+            6. **超出能力边界** - 系统无法处理的问题
+               - 示例:"帮我转账"、"下个月我能存多少钱"
+            7. **意图澄清** - 问题模糊需要进一步确认
+               - 示例:"花了多少钱"(无时间范围)、"查一下交易"(无具体条件)
+            8. ** 打招呼 **
+               - 示例: "你好"
+    
+            ### 输出规则:
+            - 只输出1个数字(1-8),不要任何解释
+            - 如果问题不明确,优先选7(意图澄清)
+            - 如果问题超出流水分析范围,选6(超出能力边界)
+            - 如果有多个意图,选最主要的一个
+        """
+        pt = ChatPromptTemplate.from_template(template)
+        chain = pt | self.llm
+
+        # 从state获取历史消息
+        history_messages = state.get("history_messages", [])
+        # 转换为字符串格式
+        history_str = "\n".join([f"{msg.type}: {msg.content}" for msg in history_messages])
+
+        try:
+
+            response = chain.invoke({"question": state["question"], "history_messages": history_str})
+            logger.info(f"对用户问题:{state['question']} 进行意图识别:{response.content}")
+
+            # 更新历史
+            new_question = HumanMessage(content=state["question"])
+            ai_message = AIMessage(content=response.content)
+            new_history = history_messages + [new_question] + [ai_message]
+
+            return {"intent_num": response.content, "history_messages": new_history[-100:]}
+        except Exception as e:
+            print(f"用户问题:{state['question']}意图识别异常,{str(e)}")
+            return {"status": "error"}
+
+    def _router(self, state: AgentState):
+        """
+        根据意图进行进行路由
+        :param state:
+        :return:
+        """
+        intent_num = state["intent_num"]
+        if intent_num == "8":
+            return "_say_hello"
+        elif intent_num == "7": # 意图澄清
+            return "_intent_clarify"
+        elif intent_num == "6": # 超出能力边界
+            return "_beyond_ability"
+        elif intent_num == "5": # 数据操作与证明
+            return "_data_op"
+        elif intent_num == "4": # 生成报告
+            return "_gen_report"
+        elif intent_num == "3": # 洞察分析
+            return "_analysis"
+        elif intent_num == "2": # 统计汇总
+            return "_statistics"
+        elif intent_num == "1": # 流水明细查询
+            return "_query"
+        else:
+            return "_invalid"
+
+    def _say_hello(self, state: AgentState):
+        """
+        向客户打招呼
+        :param state:
+        :return:
+        """
+        template = """
+            你是交易流水分析助手,用户在跟你打招呼,请合理组织话术并进行回答。
+            
+            ### 用户说
+            {question}
+            
+            ### 回答要求
+            - 友好
+            - 专业
+            - 快速
+        """
+        pt = ChatPromptTemplate.from_template(template)
+        chain = pt | self.llm
+        try:
+            response = chain.invoke({"question": state["question"]})
+            logger.info(f"用户说:{state['question']}, AI回答: {response.content}")
+            return {"answer": response.content}
+        except Exception as e:
+            print(f"say_hello异常 {str(e)}")
+            return {"status": "error", "message": str(e)}
+
+
+    def _intent_clarify(self, state: AgentState):
+        """
+        意图澄清节点
+        :param state:
+        :return:
+        """
+        template = """
+            你是交易流水分析助手,根据用户问题和历史对话,对用户的问题进行反问,以便充分理解用户意图。
+            
+            ### 用户问题
+            {question}
+            
+            ### 历史对话
+            {history_messages}
+        """
+        pt = ChatPromptTemplate.from_template(template)
+        # 从state获取历史消息
+        history_messages = state.get("history_messages", [])
+        # 转换为字符串格式
+        history_str = "\n".join([f"{msg.type}: {msg.content}" for msg in history_messages])
+
+        chain = pt | self.llm
+        try:
+            response = chain.invoke({
+                "question": state["question"],
+                "history_messages": history_str
+            })
+
+            # 更新历史对话
+            new_question = HumanMessage(content=state["question"])
+            ai_message = AIMessage(content=response.content)
+            new_history = history_messages + [new_question] + [ai_message]
+
+            return {"answer": response.content,  "history_messages": new_history[-100:]}
+        except Exception as e:
+            return {"status": "error", "message": str(e)}
+
+    def _beyond_ability(self, state: AgentState):
+        """
+        超出能力边界处理节点
+        :param state:
+        :return:
+        """
+        template = """
+            你是交易流水分析助手,对于用户的问题,已超出你的能力边界,组合合理的话术回答用户。
+            
+            ### 用户问题
+            {question}
+            
+            ### 要求
+            - 友好
+            - 专业
+            - 快速
+        """
+        pt = ChatPromptTemplate.from_template(template)
+        chain = pt | self.llm
+        try:
+            response = chain.invoke({"question": state["question"]})
+            return {"answer": response.content}
+        except Exception as e:
+            return {"status": "error", "message": str(e)}
+
+    def _data_op(self, state: AgentState):
+        """
+        数据验证处理节点
+        :param state:
+        :return:
+        """
+        return {"answer": "暂不支持"}
+
+    def _gen_report(self, state: AgentState):
+        """
+        生成报告节点
+        :param state:
+        :return:
+        """
+        template = """
+        
+        """
+        return {"answer": {"report": {"title": "this is title", "content": "this is content"}}}
+
+    def _analysis(self, state: AgentState):
+        """
+        洞察分析处理节点
+        :param state:
+        :return:
+        """
+        return {"answer": "这是洞察分析结果"}
+
+    def _statistics(self, state: AgentState):
+        """
+        统计汇总处理节点
+        :param state:
+        :return:
+        """
+        return {"answer": "这是统计汇总处理结果"}
+
+    def _query(self, state: AgentState):
+        """
+        流水检索处理节点
+        :param state:
+        :return:
+        """
+
+        # 创建 react agent( reasoning/acting )
+        # 工具集,理解数据集结构
+        tools = [understand_dataset_structure]
+
+        system_prompt = f"""
+        你是交易流水分析助手,根据用户问题和已有的数据集,检索出对应的数据并返回。
+
+        ### 用户问题
+        {state['question']}
+
+        ### 已有数据集
+        {state['data_set']}
+
+        ### 要求
+        - 首先要理解 数据集结构
+        - 必须在 已有数据集(已提供) 中进行检索
+        - 检索出 全部 符合要求的记录
+        - 如果没有符合要求的记录,则友好提示
+        """
+        try:
+            agent = create_react_agent(model=self.llm, tools=tools, prompt=system_prompt)
+            response = agent.invoke({
+                "messages": [
+                    {
+                        "role": "user",
+                        "content": f"""
+                            ### 用户问题
+                            {state['question']}
+                        """
+                    }
+                ]
+            })
+            answer = response["messages"][-1].content
+
+            # 解析 LLM 的回答
+            try:
+                result = json.loads(answer)
+                if not result:
+                    answer = "没有找到符合要求的记录"
+            except json.JSONDecodeError:
+                answer = "LLM 的回答格式不正确,请重新提问"
+
+            logger.info(f"用户问题:{state['question']}, AI回答: {answer}")
+            return {"answer": answer}
+        except Exception as e:
+            logger.error(f"_query 异常:{str(e)}")
+            return {"status": "error", "message": str(e)}
+
+
+    def _invalid(self, state: AgentState):
+        """
+        无效问题
+        :param state:
+        :return:
+        """
+        return {"answer": "这是一个无效问题"}
+
+    def _knowledge_save(self, state: AgentState):
+        """
+        从用户问题,AI解答等过程中发现知识、存储知识
+        :param state:
+        :return:
+        """
+
+    async def process_upload(self, state: AgentState) -> AgentState:
+        """处理文件上传"""
+        print("🔍 处理文件上传...")
+        import os
+        file_path = state.get("file_path", "")
+        if not file_path or not os.path.exists(file_path):
+            state["messages"].append(AIMessage(content="未找到上传的流水文件,请重新上传。"))
+            return state
+
+        try:
+            # 提取文本
+            pdf_text = self.pdf_tools.extract_text_from_pdf(file_path)
+
+            # 检测银行类型
+            bank_type = self.pdf_tools.detect_bank_type(pdf_text)
+            state["bank_type"] = bank_type
+
+            # 解析交易数据
+            if "招商银行" in bank_type:
+                transactions_json = self.pdf_tools.parse_cmb_statement(pdf_text)
+                transactions = json.loads(transactions_json)
+
+                # 转换为DataFrame
+                df = pd.DataFrame(transactions)
+                df['date'] = pd.to_datetime(df['date'])
+                df = df.sort_values('date')
+
+                # 添加分类
+                df['category'] = df['description'].apply(
+                    lambda x: self._categorize_transaction(x)
+                )
+
+                state["transactions_df"] = df
+
+                # 创建分析工具实例
+                self.analysis_tools = AnalysisTools(df)
+
+                summary = self.analysis_tools.get_summary_statistics()
+
+                response = f"""
+                    ✅ 文件解析成功!
+
+                    📊 基本信息:
+                    - 银行类型:{bank_type}
+                    - 交易笔数:{summary['total_transactions']} 笔
+                    - 时间范围:{summary['date_range']['start']} 至 {summary['date_range']['end']}
+                    - 总收入:¥{summary['total_income']:,.2f}
+                    - 总支出:¥{summary['total_expense']:,.2f}
+                    - 净现金流:¥{summary['net_cash_flow']:,.2f}
+
+                    您可以问我:
+                    1. "分析我的消费模式"
+                    2. "检测异常交易"
+                    3. "生成财务报告"
+                    4. "查询大额交易"
+                    5. "预测未来余额"
+                """
+
+                state["messages"].append(AIMessage(content=response))
+
+            else:
+                state["messages"].append(AIMessage(
+                    content=f"检测到{bank_type},当前主要支持招商银行格式,其他银行功能正在开发中。"
+                ))
+
+            state["current_step"] = "analysis_ready"
+
+        except Exception as e:
+            state["messages"].append(AIMessage(
+                content=f"文件解析失败:{str(e)}"
+            ))
+
+        return state
+
+    def ask_question(self, session: str, question: str, data_set_file: str = None) -> dict:
+        """
+        程序调用入口,用户提问
+        :param session:   会话ID
+        :param question:  用户问题
+        :param data_set_file: 数据集文档(json数据文件)
+        :return:
+        """
+
+        data_set = []
+        # 读取 json 文件数据, 格式为 List[Dict]
+        import json
+        import os
+        try:
+            logger.info(f"传入的数据文件路径: {data_set_file}")
+            if data_set_file:
+                if os.path.exists(data_set_file):
+                    with open(data_set_file, 'r', encoding='utf-8') as file:
+                        data_set = json.load(file)
+                        logger.info(f"加载数据条数:{len(data_set)}")
+                        # 加载数据集
+                        df = pd.DataFrame(data_set)
+                        df['txDate'] = pd.to_datetime(df['txDate'])  # 提前转换类型
+                else:
+                    logger.error(f"数据文件:{data_set_file} 不存在,请检查路径!")
+            else:
+                logger.info("未传入数据集文件")
+        except Exception as e:
+            logger.error(f"读取数据文件:{data_set_file}异常,{str(e)}")
+            return {
+                "status": "error",
+                "message": f"读取数据文件:{data_set_file}异常,{str(e)}"
+            }
+
+        if not session or not question:
+            return {
+                "status": "error",
+                "message": "缺少参数"
+            }
+        result = {
+            "status": "success"
+        }
+        try:
+            config = {"configurable": {"thread_id": session}}
+            current_state = self.memory.get(config)
+            history_messages = current_state["channel_values"]["history_messages"] if current_state else []
+            if len(data_set) == 0: # 没有指定数据集
+                data_set = current_state["channel_values"]["data_set"] if current_state else []
+
+            # 执行
+            response = self.graph.invoke({
+                "session": session,
+                "question": question,
+                "data_set": data_set,
+                "transactions_df": df,
+                "history_messages": history_messages,
+            }, config)
+            result["answer"] = response.get("answer","")
+        except Exception as e:
+            print(f"用户对话失败,异常:{str(e)}")
+            result["status"] = "error"
+
+        return result
+
+
+# ==================== 构建智能体图 ====================
+def create_bank_statement_agent():
+    """创建银行流水分析智能体图"""
+
+    # 创建节点
+    nodes = TxFlowAnalysisAgent()
+
+    # 创建状态图
+    workflow = StateGraph(AgentState)
+
+    # 添加节点
+    workflow.add_node("process_upload", nodes.process_upload)
+    workflow.add_node("analyze_query", nodes.analyze_query)
+    workflow.add_node("waiting_for_input", lambda state: state)  # 等待输入
+
+    # 设置入口点
+    workflow.set_entry_point("waiting_for_input")
+
+    # 添加条件边
+    workflow.add_conditional_edges(
+        "waiting_for_input",
+        nodes.route_query,
+        {
+            "process_upload": "process_upload",
+            "analyze_query": "analyze_query",
+            "waiting_for_input": "waiting_for_input"
+        }
+    )
+
+    # 添加普通边
+    workflow.add_edge("process_upload", "waiting_for_input")
+    workflow.add_edge("analyze_query", "waiting_for_input")
+
+    # 编译图
+    graph = workflow.compile()
+
+    return graph
+
+if __name__ == '__main__':
+    agent = TxFlowAnalysisAgent()
+    question = "你好"
+    # 数据集文件
+    data_file = "/Applications/work/宇信科技/知识沉淀平台/原始数据-流水分析-农业原始数据.json"
+    result = agent.ask_question(session="s1", question=question, data_set_file=data_file)
+    print(f"问题:{question}, 响应:{result}")
+
+    question = "查询交易日期是2023-01-05对应的收入记录"
+    result = agent.ask_question(session="s1", question=question, data_set_file=data_file)
+    print(f"问题:{question}, 响应:{result}")
+
+    question = "查询交易对手是绿源农产品公司的记录"
+    result = agent.ask_question(session="s1", question=question, data_set_file=data_file)
+    print(f"问题:{question}, 响应:{result}")
+
+    # question = "查找转给贾强的交易记录"
+    # result = agent.ask_question(session="s1", question=question)
+    # print(f"问题:{question}, 意图:{result}")
+    # question = "花了多少钱"
+    # result = agent.ask_question(session="s1", question=question)
+    # print(f"问题:{question}, 意图:{result}")
+    # question = "统计用户总收入"
+    # result = agent.ask_question(session="s1", question=question)
+    # print(f"问题:{question}, 意图:{result}")
+    # question = "生成分析报告"
+    # result = agent.ask_question(session="s1", question=question)
+    # print(f"问题:{question}, 意图:{result}")
+    # question = "转账给张三"
+    # result = agent.ask_question(session="s1", question=question)
+    # print(f"问题:{question}, 意图:{result}")
+    # question = "生成一份异常分析报告"
+    # result = agent.ask_question(session="s1", question=question)
+    # print(f"问题:{question}, 意图:{result}")

+ 133 - 0
llmops/agents/data_manager.py

@@ -0,0 +1,133 @@
+import pandas as pd
+import json
+from typing import List, Dict, Any, Tuple
+import os
+
+
+class DataManager:
+    """统一管理数据加载、验证和预处理"""
+
+    # 字段映射(支持多种别名)
+    FIELD_MAPPING = {
+        'txId': ['txId', 'transaction_id', '交易ID', 'id'],
+        'txDate': ['txDate', 'transaction_date', '交易日期', 'date'],
+        'txTime': ['txTime', 'transaction_time', '交易时间', 'time'],
+        'txAmount': ['txAmount', 'amount', '交易金额', '金额'],
+        'txBalance': ['txBalance', 'balance', '余额', '交易后余额'],
+        'txDirection': ['txDirection', 'direction', '交易方向', '收支'],
+        'txSummary': ['txSummary', 'summary', '交易摘要', '摘要', '说明'],
+        'txCounterparty': ['txCounterparty', 'counterparty', '交易对手', '对方账户'],
+        'createdAt': ['createdAt', 'created_at', '创建时间']
+    }
+
+    # 必需字段
+    REQUIRED_FIELDS = ['txId', 'txDate', 'txAmount', 'txDirection']
+
+    @staticmethod
+    def load_from_file(file_path: str) -> Tuple[List[Dict[str, Any]], pd.DataFrame]:
+        """从JSON文件加载数据,支持字段别名"""
+
+        if not os.path.exists(file_path):
+            raise FileNotFoundError(f"数据文件不存在: {file_path}")
+
+        with open(file_path, 'r', encoding='utf-8') as f:
+            raw_data = json.load(f)
+
+        if not isinstance(raw_data, list):
+            raise ValueError("JSON文件内容必须是数组格式")
+
+        if not raw_data:
+            raise ValueError("数据文件为空")
+
+        # 标准化字段名
+        standardized_data = []
+        for record in raw_data:
+            standardized_record = {}
+            for std_field, possible_names in DataManager.FIELD_MAPPING.items():
+                for name in possible_names:
+                    if name in record:
+                        standardized_record[std_field] = record[name]
+                        break
+
+            # 保留原始数据中未映射的字段
+            for key, value in record.items():
+                if key not in [name for names in DataManager.FIELD_MAPPING.values() for name in names]:
+                    standardized_record[key] = value
+
+            standardized_data.append(standardized_record)
+
+        # 转换为DataFrame并优化
+        df = pd.DataFrame(standardized_data)
+        df = DataManager._optimize_dataframe(df)
+
+        return standardized_data, df
+
+    @staticmethod
+    def _optimize_dataframe(df: pd.DataFrame) -> pd.DataFrame:
+        """优化DataFrame数据类型"""
+
+        # 日期字段
+        if 'txDate' in df.columns:
+            df['txDate'] = pd.to_datetime(df['txDate'], errors='coerce').dt.date
+
+        # 时间字段
+        if 'txTime' in df.columns:
+            df['txTime'] = df['txTime'].astype(str)
+
+        # 金额字段
+        for col in ['txAmount', 'txBalance']:
+            if col in df.columns:
+                df[col] = pd.to_numeric(df[col], errors='coerce')
+
+        # 创建时间
+        if 'createdAt' in df.columns:
+            df['createdAt'] = pd.to_datetime(df['createdAt'], errors='coerce')
+
+        # 分类字段
+        if 'txDirection' in df.columns:
+            df['txDirection'] = df['txDirection'].astype('category')
+
+        return df
+
+    @staticmethod
+    def validate_data_schema(data: List[Dict[str, Any]]) -> Tuple[bool, List[str]]:
+        """验证数据格式"""
+        errors = []
+
+        if not data:
+            return False, ["数据集为空"]
+
+        # 检查必需字段
+        first_record = data[0]
+        missing_fields = []
+        for field in DataManager.REQUIRED_FIELDS:
+            if field not in first_record:
+                missing_fields.append(field)
+
+        if missing_fields:
+            errors.append(f"缺少必需字段: {', '.join(missing_fields)}")
+
+        return len(errors) == 0, errors
+
+    @staticmethod
+    def format_data_summary(data: List[Dict[str, Any]]) -> str:
+        """生成数据摘要"""
+        if not data:
+            return "数据集为空"
+
+        df = pd.DataFrame(data)
+        summary = []
+
+        summary.append(f"记录总数: {len(data)}")
+
+        if 'txDate' in df.columns:
+            summary.append(f"日期范围: {df['txDate'].min()} 至 {df['txDate'].max()}")
+
+        if 'txAmount' in df.columns:
+            summary.append(f"金额范围: {df['txAmount'].min()} 至 {df['txAmount'].max()}")
+
+        if 'txDirection' in df.columns:
+            direction_counts = df['txDirection'].value_counts().to_dict()
+            summary.append(f"收支分布: {direction_counts}")
+
+        return " | ".join(summary)

+ 159 - 0
llmops/agents/graph.py

@@ -0,0 +1,159 @@
+from langgraph.graph import StateGraph, START, END
+from langgraph.checkpoint.memory import MemorySaver
+from llmops.agents.state import AgentState, convert_numpy_types
+from llmops.agents.planning_agent import planning_node
+from llmops.agents.outline_agent import outline_node
+from llmops.agents.metrics_agent import metrics_node
+
+
+def create_report_generation_graph():
+    """构建报告生成图"""
+
+    workflow = StateGraph(AgentState)
+
+    # 添加节点
+    workflow.add_node("planning_node", planning_node)
+    workflow.add_node("outline_generator", outline_node)
+    workflow.add_node("metrics_calculator", metrics_node)
+    workflow.add_node("report_compiler", compile_final_report)
+    workflow.add_node("clarify_node", handle_clarification)
+
+    # 设置入口
+    workflow.add_edge(START, "planning_node")
+
+    # 条件边:根据规划节点返回的状态路由
+    workflow.add_conditional_edges(
+        "planning_node",
+        route_from_planning,
+        {
+            "outline_generator": "outline_generator",
+            "metrics_calculator": "metrics_calculator",
+            "report_compiler": "report_compiler",
+            "clarify_node": "clarify_node",
+            "planning_node": "planning_node",  # 继续循环
+            "END": END
+        }
+    )
+
+    # 返回规划节点重新决策
+    workflow.add_edge("outline_generator", "planning_node")
+    workflow.add_edge("metrics_calculator", "planning_node")
+    workflow.add_edge("clarify_node", "planning_node")
+
+    # 报告编译后结束
+    workflow.add_edge("report_compiler", END)
+
+    # 编译图
+    return workflow.compile(
+        checkpointer=MemorySaver(),
+        interrupt_before=[],
+        interrupt_after=[]
+    )
+
+
+def route_from_planning(state: AgentState) -> str:
+    """
+    从规划节点路由到下一个节点
+    返回目标节点名称
+    """
+    print(f"\n🔍 [路由决策] 步骤={state['planning_step']}, "
+          f"大纲版本={state['outline_version']}, "
+          f"大纲已生成={state.get('outline_draft') is not None}, "
+          f"指标需求={len(state.get('metrics_requirements', []))}, "
+          f"已计算={len(state.get('computed_metrics', {}))}")
+
+    # 新增:防止无限循环
+    if state['planning_step'] > 50:
+        print("⚠️ 规划步骤超过50次,强制终止并生成报告")
+        return "report_compiler"
+
+    # 如果大纲为空 → 生成大纲
+    if not state.get("outline_draft"):
+        print("→ 路由到 outline_generator(大纲为空)")
+        return "outline_generator"
+
+    # 如果指标需求为空 → 重新生成大纲
+    if not state.get("metrics_requirements"):
+        print("→ 路由到 outline_generator(指标需求为空)")
+        return "outline_generator"
+
+    # 计算覆盖率
+    required = len(state["metrics_requirements"])
+    computed = len(state["computed_metrics"])
+    coverage = computed / required if required > 0 else 0
+
+    print(f"  指标覆盖率 = {computed}/{required} = {coverage:.2%}")
+
+    # 新增:如果规划步骤过多且覆盖率超过50%,强制生成报告
+    if state['planning_step'] > 30 and coverage > 0.5:
+        print(f"→ 路由到 report_compiler(步骤过多,强制终止,覆盖率={coverage:.2%})")
+        return "report_compiler"
+
+    # 如果覆盖率 < 80% → 计算指标
+    if coverage < 0.8:
+        print(f"→ 路由到 metrics_calculator(覆盖率={coverage:.2%} < 80%)")
+        return "metrics_calculator"
+
+    # 如果覆盖率 ≥ 80% → 生成报告
+    print(f"→ 路由到 report_compiler(覆盖率={coverage:.2%} ≥ 80%)")
+    return "report_compiler"
+
+
+def compile_final_report(state: AgentState) -> AgentState:
+    """报告编译节点:整合所有结果"""
+
+    # 关键修复:将Pydantic模型转换为字典
+    outline = state["outline_draft"]
+    if hasattr(outline, 'dict'):
+        outline_dict = outline.dict()
+    else:
+        outline_dict = outline
+
+    metrics = state["computed_metrics"]
+
+    # 按章节组织内容
+    sections = []
+    for section in outline_dict["sections"]:
+        section_metrics = {
+            mid: metrics.get(mid, "数据缺失")
+            for mid in section["metrics_needed"]
+        }
+        sections.append({
+            "title": section["title"],
+            "description": section["description"],
+            "metrics": section_metrics
+        })
+
+    final_report = {
+        "title": outline_dict["report_title"],
+        "sections": sections,
+        "summary": {
+            "total_metrics": len(metrics),
+            "required_metrics": len(outline_dict["global_metrics"]),
+            "coverage_rate": float(state["completeness_score"]),
+            "planning_iterations": int(state["planning_step"])
+        }
+    }
+
+    result_state = {
+        **state,
+        "answer": final_report,
+        "status": "success",
+        "messages": state["messages"] + [("ai", f"🎉 报告生成完成:{outline_dict['report_title']}")]
+    }
+
+    # 关键修复:返回前清理状态
+    return convert_numpy_types(result_state)
+
+
+def handle_clarification(state: AgentState) -> AgentState:
+    """澄清处理节点"""
+    result_state = {
+        **state,
+        "status": "clarifying",
+        "is_complete": True,
+        "answer": "需要更多信息,请明确您的报告需求"
+    }
+
+    # 关键修复:返回前清理状态
+    return convert_numpy_types(result_state)

+ 309 - 0
llmops/agents/main.py

@@ -0,0 +1,309 @@
+import asyncio
+import json
+import os
+import sys
+from typing import List, Dict, Any
+from llmops.agents.graph import create_report_generation_graph
+from llmops.agents.data_manager import DataManager
+from llmops.agents.state import create_initial_state
+
+# ============= 数据加载方法 =============
+
+def load_sample_data() -> List[Dict[str, Any]]:
+    """加载符合完整字段格式的示例数据"""
+    return [
+        {
+            "txId": "TX202512310001",
+            "txDate": "2025-12-31",
+            "txTime": "13:55",
+            "txAmount": 3100,
+            "txBalance": 245600,
+            "txDirection": "收入",
+            "txSummary": "中药材服务费",
+            "txCounterparty": "康恩贝药业",
+            "createdAt": "2025-11-30 05:57"
+        },
+        {
+            "txId": "TX202512310002",
+            "txDate": "2025-12-30",
+            "txTime": "09:20",
+            "txAmount": 15000,
+            "txBalance": 242500,
+            "txDirection": "收入",
+            "txSummary": "货款结算",
+            "txCounterparty": "同仁堂医药",
+            "createdAt": "2025-11-29 18:23"
+        },
+        # ... 更多示例数据
+    ]
+
+
+def load_data_from_file(file_path: str) -> List[Dict[str, Any]]:
+    """
+    从JSON文件加载交易流水数据
+
+    参数:
+        file_path: JSON文件的完整路径
+
+    返回:
+        交易流水数据列表
+
+    异常:
+        FileNotFoundError: 文件不存在
+        ValueError: 数据格式错误或不符合要求
+    """
+    try:
+        print(f"📁 加载数据文件: {file_path}")
+        data, df = DataManager.load_from_file(file_path)
+
+        # 验证数据质量
+        is_valid, errors = DataManager.validate_data_schema(data)
+        if not is_valid:
+            raise ValueError(f"数据验证失败: {'; '.join(errors)}")
+
+        # 打印数据摘要
+        print(DataManager.format_data_summary(data))
+
+        return data
+
+    except Exception as e:
+        raise ValueError(f"数据加载失败: {e}")
+
+
+def create_sample_file():
+    """创建符合新格式的示例数据文件"""
+    sample_file = "sample_transactions.json"
+    sample_data = load_sample_data()
+
+    with open(sample_file, 'w', encoding='utf-8') as f:
+        json.dump(sample_data, f, ensure_ascii=False, indent=2)
+
+    print(f"✅ 示例文件已创建: {sample_file}")
+    print(f"📍 文件位置: {os.path.abspath(sample_file)}")
+
+
+async def run_report_generation(
+        requirement: str,
+        data: List[Dict[str, Any]],
+        session_id: str = None
+) -> Dict[str, Any]:
+    """运行报告生成流程"""
+
+    # 验证和格式化数据
+    print(f"\n🔍 验证数据格式...")
+    is_valid, errors = DataManager.validate_data_schema(data)
+    if not is_valid:
+        raise ValueError(f"数据验证失败: {'; '.join(errors)}")
+
+    # 1. 初始化状态 - 关键修复:使用create_initial_state
+    if session_id is None:
+        session_id = f"report_session_{hash(requirement) % 10000}"
+
+    # 关键修复:使用清理后的初始状态
+    initial_state = create_initial_state(
+        question=requirement,
+        data=data,
+        session_id=session_id
+    )
+
+    # 2. 创建图
+    graph = create_report_generation_graph()
+
+    # 3. 配置
+    config = {
+        "configurable": {"thread_id": initial_state["session_id"]},
+        "recursion_limit": 50  # 增加到50
+    }
+
+    print(f"\n🚀 开始报告生成流程 (会话: {session_id})...\n")
+
+    # 4. 执行并流式打印
+    step = 1
+    try:
+        async for event in graph.astream(initial_state, config, stream_mode="updates"):
+            node_name = list(event.keys())[0]
+            node_output = event[node_name]
+
+            if "messages" in node_output and node_output["messages"]:
+                message = node_output["messages"][-1]
+                if isinstance(message, tuple):
+                    role, content = message
+                    print(f"📍 Step {step:02d} | {node_name}: {content}")
+
+                    # 添加状态跟踪
+                    if "metrics_requirements" in node_output:
+                        reqs = node_output["metrics_requirements"]
+                        print(f"   📊 指标需求: {[m.metric_id for m in reqs]}")
+
+                    if "computed_metrics" in node_output:
+                        computed = node_output["computed_metrics"]
+                        print(f"   ✅ 已计算: {list(computed.keys())}")
+
+                    if "pending_metric_ids" in node_output:
+                        pending = node_output["pending_metric_ids"]
+                        print(f"   ⏳ 待计算: {pending}")
+                elif isinstance(message, str):
+                    print(f"📍 Step {step:02d} | {node_name}: {message}")
+
+            step += 1
+
+            # 安全检查:防止无限循环
+            if step > 100:
+                print("⚠️ 达到最大步数限制(100),强制终止")
+                break
+
+    except Exception as e:
+        print(f"❌ 图执行异常: {e}")
+        import traceback
+        traceback.print_exc()
+        raise
+
+    # 5. 等待并获取最终状态
+    await asyncio.sleep(0.5)
+    final_state = await graph.aget_state(config)
+
+    # 6. 调试信息和错误处理
+    print(f"\nDebug: 最终状态键 = {list(final_state.values.keys())}")
+    print(f"Debug: is_complete = {final_state.values.get('is_complete')}")
+    print(f"Debug: 执行步数 = {step - 1}")
+
+    # 检查 answer 是否存在
+    if "answer" not in final_state.values:
+        print(f"❌ 错误:最终状态中缺少 'answer' 键")
+        print(f"大纲状态: {final_state.values.get('outline_draft')}")
+        print(f"最后路由: {final_state.values.get('next_route')}")
+        raise ValueError(
+            f"报告生成未完成:缺少 'answer' 键\n"
+            f"最后路由: {final_state.values.get('next_route')}\n"
+            f"大纲草稿: {final_state.values.get('outline_draft') is not None}\n"
+            f"已计算指标: {len(final_state.values.get('computed_metrics', {}))}"
+        )
+
+    report = final_state.values["answer"]
+
+    # 7. 验证报告质量
+    if isinstance(report, dict) and "error" in report:
+        raise ValueError(f"报告生成出错: {report['error']}")
+
+    return report
+
+
+def print_usage():
+    """打印使用说明"""
+    print("""
+使用说明:
+  python main.py              # 运行主程序(需配置文件路径)
+  python main.py --create-sample  # 创建示例数据文件
+  python main.py --help       # 显示帮助信息
+  python main.py --validate file.json  # 验证数据文件格式
+
+数据文件要求:
+  JSON数组,每条记录包含以下字段:
+  - txId: 交易ID (字符串)
+  - txDate: 交易日期 (YYYY-MM-DD)
+  - txTime: 交易时间 (HH:MM)
+  - txAmount: 交易金额 (数值)
+  - txBalance: 交易后余额 (数值)
+  - txDirection: 交易方向 (收入/支出)
+  - txSummary: 交易摘要 (字符串)
+  - txCounterparty: 交易对手 (字符串)
+  - createdAt: 记录创建时间 (YYYY-MM-DD HH:MM:SS)
+    """)
+
+
+def validate_file(file_path: str):
+    """验证数据文件"""
+    try:
+        print(f"🔍 验证文件: {file_path}")
+        data = load_data_from_file(file_path)
+        print("✅ 文件验证通过!")
+        return True
+    except Exception as e:
+        print(f"❌ 验证失败: {e}")
+        return False
+
+
+def handle_command_line():
+    """处理命令行参数"""
+    import sys
+
+    if len(sys.argv) == 1:
+        asyncio.run(main())
+    elif sys.argv[1] == "--create-sample":
+        create_sample_file()
+    elif sys.argv[1] == "--help":
+        print_usage()
+    elif sys.argv[1] == "--validate" and len(sys.argv) > 2:
+        validate_file(sys.argv[2])
+    else:
+        print("未知参数,使用 --help 查看帮助")
+
+
+async def main():
+    """主函数:展示数据加载和报告生成"""
+
+    print("=" * 70)
+    print("  交易流水报告生成系统 (支持完整字段格式)")
+    print("=" * 70)
+
+    # ===== 配置区 =====
+    use_sample_data = False  # 设为 True 使用示例数据,False 使用真实文件
+
+    if use_sample_data:
+        print("\n📊 使用内置示例数据...")
+        data = load_sample_data()
+        requirement = "生成2025年12月交易分析报告,重点关注收入支出分布、主要交易对手、余额变化趋势和大额交易"
+
+    else:
+        # 修改为你的文件路径
+        file_path = "/Applications/work/宇信科技/知识沉淀平台/原始数据-流水分析-农业原始数据.json"
+
+        # 如果文件不存在,创建示例文件
+        if not os.path.exists(file_path):
+            print(f"\n⚠️ 文件不存在: {file_path}")
+            response = input("是否创建示例文件? (y/n): ")
+            if response.lower() == 'y':
+                create_sample_file()
+                file_path = "sample_transactions.json"
+            else:
+                print("\n💡 提示: 请将你的JSON文件路径赋值给 file_path 变量")
+                return
+
+        try:
+            print(f"\n📁 从文件加载数据: {file_path}")
+            data = load_data_from_file(file_path)
+            requirement = input("\n请输入报告需求: ") or \
+                          "生成交易分析报告,包含收入趋势、主要客户、大额交易和余额分析"
+
+        except Exception as e:
+            print(f"\n❌ 数据加载失败: {e}")
+            return
+
+    # 运行报告生成
+    try:
+        report = await run_report_generation(
+            requirement=requirement,
+            data=data,
+            session_id="demo_session_001"
+        )
+
+        # 打印报告摘要
+        print("\n" + "=" * 70)
+        print("📊 最终生成报告")
+        print("=" * 70)
+        print(json.dumps(report, ensure_ascii=False, indent=2))
+
+        # 保存报告
+        output_file = "generated_report.json"
+        with open(output_file, 'w', encoding='utf-8') as f:
+            json.dump(report, f, ensure_ascii=False, indent=2)
+        print(f"\n💾 报告已保存到: {os.path.abspath(output_file)}")
+
+    except Exception as e:
+        print(f"\n❌ 报告生成失败: {e}")
+        import traceback
+        traceback.print_exc()
+
+
+if __name__ == "__main__":
+    handle_command_line()

+ 127 - 0
llmops/agents/state.py

@@ -0,0 +1,127 @@
+from typing import TypedDict, List, Dict, Any, Optional
+from langchain_core.messages import BaseMessage
+from pydantic import BaseModel, Field
+import numpy as np
+
+
+# ============= 数据模型 =============
+
+class MetricRequirement(BaseModel):
+    """指标需求定义"""
+    metric_id: str = Field(description="指标唯一标识,如 'total_income_jan'")
+    metric_name: str = Field(description="指标中文名称")
+    calculation_logic: str = Field(description="计算逻辑描述")
+    required_fields: List[str] = Field(description="所需字段")
+    dependencies: List[str] = Field(default_factory=list)
+
+
+class ReportSection(BaseModel):
+    """报告大纲章节"""
+    section_id: str = Field(description="章节ID")
+    title: str = Field(description="章节标题")
+    description: str = Field(description="章节内容要求")
+    metrics_needed: List[str] = Field(description="所需指标ID列表")
+
+
+class ReportOutline(BaseModel):
+    """完整报告大纲"""
+    report_title: str
+    sections: List[ReportSection]
+    global_metrics: List[MetricRequirement]
+
+
+# ============= 序列化工具函数 =============
+
+def convert_numpy_types(obj: Any) -> Any:
+    """
+    递归转换所有numpy类型为Python原生类型
+    关键修复:确保所有数据可序列化
+    """
+    if isinstance(obj, dict):
+        return {str(k): convert_numpy_types(v) for k, v in obj.items()}
+    elif isinstance(obj, list):
+        return [convert_numpy_types(item) for item in obj]
+    elif isinstance(obj, tuple):
+        return tuple(convert_numpy_types(item) for item in obj)
+    elif isinstance(obj, set):
+        return {convert_numpy_types(item) for item in obj}
+    elif isinstance(obj, np.integer):
+        return int(obj)
+    elif isinstance(obj, np.floating):
+        return float(obj)
+    elif isinstance(obj, np.bool_):
+        return bool(obj)
+    elif isinstance(obj, np.ndarray):
+        return convert_numpy_types(obj.tolist())
+    elif hasattr(obj, 'item') and hasattr(obj, 'dtype'):  # numpy scalar
+        return convert_numpy_types(obj.item())
+    else:
+        return obj
+
+
+def create_initial_state(question: str, data: List[Dict[str, Any]], session_id: str = None) -> Dict[str, Any]:
+    """创建初始状态,确保所有数据已清理"""
+    cleaned_data = convert_numpy_types(data)
+
+    return {
+        "question": str(question),
+        "data_set": cleaned_data,
+        "transactions_df": None,
+        "planning_step": 0,
+        "plan_history": [],
+        "outline_draft": None,
+        "outline_version": 0,
+        "metrics_requirements": [],
+        "computed_metrics": {},
+        "metrics_cache": {},
+        "pending_metric_ids": [],
+        "failed_metric_attempts": {},
+        "report_draft": {},
+        "is_complete": False,
+        "completeness_score": 0.0,
+        "messages": [],
+        "current_node": "start",
+        "session_id": str(session_id) if session_id else "default_session",
+        "next_route": "planning_node",
+        "outline_ready": False,
+        "metrics_ready": False,
+        "last_decision": "init"
+    }
+
+
+# ============= 状态定义 =============
+
+class AgentState(TypedDict):
+    # === 输入层 ===
+    question: str
+    data_set: List[Dict[str, Any]]
+    transactions_df: Optional[Any]
+
+    # === 规划层 ===
+    planning_step: int
+    plan_history: List[str]
+
+    # === 大纲层 ===
+    outline_draft: Optional[ReportOutline]
+    outline_version: int
+
+    # === 指标层 ===
+    metrics_requirements: List[MetricRequirement]
+    computed_metrics: Dict[str, Any]
+    metrics_cache: Dict[str, Any]
+    pending_metric_ids: List[str]
+    failed_metric_attempts: Dict[str, int]
+
+    # === 结果层 ===
+    report_draft: Dict[str, Any]
+    is_complete: bool
+    completeness_score: float
+
+    # === 对话历史 ===
+    messages: List[BaseMessage]
+    current_node: str
+    session_id: str
+    next_route: str
+    outline_ready: bool
+    metrics_ready: bool
+    last_decision: str

+ 0 - 0
llmops/agents/tools/__init__.py