chat_bot.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596
  1. from typing import TypedDict, Any, List, Dict
  2. from langgraph.graph import StateGraph, START, END
  3. from langchain_core.messages import AIMessage, HumanMessage, BaseMessage
  4. import pandas as pd
  5. import json
  6. from langgraph.prebuilt import create_react_agent
  7. from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
  8. from llmops.agents.datadev.memory.memory_saver_with_expiry2 import MemorySaverWithExpiry
  9. from llmops.agents.datadev.llm import get_llm
  10. from llmops.agents.tools.unerstand_dataset_tool import understand_dataset_structure
  11. import logging
  12. logger = logging.getLogger(__name__)
  13. logging.basicConfig(level=logging.DEBUG,
  14. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
  15. datefmt='%Y-%m-%d %H:%M:%S')
  16. class AgentState(TypedDict):
  17. """
  18. ChatBot智能体状态
  19. """
  20. question: str # 用户问题
  21. data_set: List[Any] # 数据集 [{}]
  22. transactions_df: pd.DataFrame # pd 数据集
  23. intent_num: str # 意图分类ID(1-8)
  24. history_messages: List[BaseMessage] # 历史消息
  25. bank_type: str # 银行类型
  26. transactions_df: pd.DataFrame # 数据集
  27. analysis_results: Dict[str, Any] # 分析结果
  28. current_step: str # 当前操作步骤
  29. file_path: str # 数据集文件路径
  30. file_hash: str # 数据集文件hash值
  31. status: str # 状态 success | error
  32. answer: Any # AI回答
  33. class TxFlowAnalysisAgent:
  34. """交易流水分析智能体"""
  35. def __init__(self):
  36. # self.llm_orchestrator = LLMOrchestrator()
  37. self.llm = get_llm()
  38. # 会话记忆
  39. self.memory = MemorySaverWithExpiry(expire_time=600, clean_interval=60)
  40. # 构建图
  41. self.graph = self._build_graph()
  42. def _build_graph(self):
  43. # 构造计算图
  44. graph_builder = StateGraph(AgentState)
  45. # 添加节点
  46. graph_builder.add_node("_intent_recognition", self._intent_recognition)
  47. graph_builder.add_node("_say_hello", self._say_hello)
  48. graph_builder.add_node("_intent_clarify", self._intent_clarify)
  49. graph_builder.add_node("_beyond_ability", self._beyond_ability)
  50. graph_builder.add_node("_data_op", self._data_op)
  51. graph_builder.add_node("_gen_report", self._gen_report)
  52. graph_builder.add_node("_analysis", self._analysis)
  53. graph_builder.add_node("_statistics", self._statistics)
  54. graph_builder.add_node("_query", self._query)
  55. graph_builder.add_node("_invalid", self._invalid)
  56. # 添加边
  57. graph_builder.add_edge(START, "_intent_recognition")
  58. graph_builder.add_edge("_say_hello", END)
  59. graph_builder.add_edge("_intent_clarify", END)
  60. graph_builder.add_edge("_beyond_ability", END)
  61. graph_builder.add_edge("_data_op", END)
  62. graph_builder.add_edge("_gen_report", END)
  63. graph_builder.add_edge("_analysis", END)
  64. graph_builder.add_edge("_statistics", END)
  65. graph_builder.add_edge("_query", END)
  66. graph_builder.add_edge("_invalid", END)
  67. # 条件边
  68. graph_builder.add_conditional_edges(source="_intent_recognition", path=self._router, path_map={
  69. "_say_hello": "_say_hello",
  70. "_intent_clarify": "_intent_clarify",
  71. "_beyond_ability": "_beyond_ability",
  72. "_data_op": "_data_op",
  73. "_gen_report": "_gen_report",
  74. "_analysis": "_analysis",
  75. "_statistics": "_statistics",
  76. "_query": "_query",
  77. "_invalid": "_invalid"
  78. })
  79. return graph_builder.compile(checkpointer=self.memory)
  80. def _intent_recognition(self, state: AgentState):
  81. """
  82. 根据用户问题,识别用户意图
  83. :param state:
  84. :return:
  85. """
  86. template = """
  87. 你是一位银行流水分析助手,专门识别用户问题的意图类别。
  88. ### 任务说明
  89. 根据用户问题和历史对话,判断用户意图属于以下哪个类别。直接输出对应的数字编号。
  90. ### 历史对话:
  91. {history_messages}
  92. ### 当前用户问题:
  93. {question}
  94. ### 意图分类体系:
  95. 1. **流水查询与检索** - 查找特定交易记录
  96. - 示例:"查一下昨天给张三的转账"、"找一笔5000元的交易"
  97. 2. **统计与汇总** - 计算数值、统计信息
  98. - 示例:"本月总支出多少"、"工资收入总计多少"
  99. 3. **洞察与分析** - 分析模式、趋势、异常
  100. - 示例:"我的消费趋势如何"、"有没有异常交易"
  101. 4. **生成报告** - 生成一份分析报告
  102. - 示例:"生成一份收入与支持的分析报告"
  103. 5. **数据操作与证明** - 验证交易
  104. - 示例:"导出本月流水"、"查这笔交易的流水号"
  105. 6. **超出能力边界** - 系统无法处理的问题
  106. - 示例:"帮我转账"、"下个月我能存多少钱"
  107. 7. **意图澄清** - 问题模糊需要进一步确认
  108. - 示例:"花了多少钱"(无时间范围)、"查一下交易"(无具体条件)
  109. 8. ** 打招呼 **
  110. - 示例: "你好"
  111. ### 输出规则:
  112. - 只输出1个数字(1-8),不要任何解释
  113. - 如果问题不明确,优先选7(意图澄清)
  114. - 如果问题超出流水分析范围,选6(超出能力边界)
  115. - 如果有多个意图,选最主要的一个
  116. """
  117. pt = ChatPromptTemplate.from_template(template)
  118. chain = pt | self.llm
  119. # 从state获取历史消息
  120. history_messages = state.get("history_messages", [])
  121. # 转换为字符串格式
  122. history_str = "\n".join([f"{msg.type}: {msg.content}" for msg in history_messages])
  123. try:
  124. response = chain.invoke({"question": state["question"], "history_messages": history_str})
  125. logger.info(f"对用户问题:{state['question']} 进行意图识别:{response.content}")
  126. # 更新历史
  127. new_question = HumanMessage(content=state["question"])
  128. ai_message = AIMessage(content=response.content)
  129. new_history = history_messages + [new_question] + [ai_message]
  130. return {"intent_num": response.content, "history_messages": new_history[-100:]}
  131. except Exception as e:
  132. print(f"用户问题:{state['question']}意图识别异常,{str(e)}")
  133. return {"status": "error"}
  134. def _router(self, state: AgentState):
  135. """
  136. 根据意图进行进行路由
  137. :param state:
  138. :return:
  139. """
  140. intent_num = state["intent_num"]
  141. if intent_num == "8":
  142. return "_say_hello"
  143. elif intent_num == "7": # 意图澄清
  144. return "_intent_clarify"
  145. elif intent_num == "6": # 超出能力边界
  146. return "_beyond_ability"
  147. elif intent_num == "5": # 数据操作与证明
  148. return "_data_op"
  149. elif intent_num == "4": # 生成报告
  150. return "_gen_report"
  151. elif intent_num == "3": # 洞察分析
  152. return "_analysis"
  153. elif intent_num == "2": # 统计汇总
  154. return "_statistics"
  155. elif intent_num == "1": # 流水明细查询
  156. return "_query"
  157. else:
  158. return "_invalid"
  159. def _say_hello(self, state: AgentState):
  160. """
  161. 向客户打招呼
  162. :param state:
  163. :return:
  164. """
  165. template = """
  166. 你是交易流水分析助手,用户在跟你打招呼,请合理组织话术并进行回答。
  167. ### 用户说
  168. {question}
  169. ### 回答要求
  170. - 友好
  171. - 专业
  172. - 快速
  173. """
  174. pt = ChatPromptTemplate.from_template(template)
  175. chain = pt | self.llm
  176. try:
  177. response = chain.invoke({"question": state["question"]})
  178. logger.info(f"用户说:{state['question']}, AI回答: {response.content}")
  179. return {"answer": response.content}
  180. except Exception as e:
  181. print(f"say_hello异常 {str(e)}")
  182. return {"status": "error", "message": str(e)}
  183. def _intent_clarify(self, state: AgentState):
  184. """
  185. 意图澄清节点
  186. :param state:
  187. :return:
  188. """
  189. template = """
  190. 你是交易流水分析助手,根据用户问题和历史对话,对用户的问题进行反问,以便充分理解用户意图。
  191. ### 用户问题
  192. {question}
  193. ### 历史对话
  194. {history_messages}
  195. """
  196. pt = ChatPromptTemplate.from_template(template)
  197. # 从state获取历史消息
  198. history_messages = state.get("history_messages", [])
  199. # 转换为字符串格式
  200. history_str = "\n".join([f"{msg.type}: {msg.content}" for msg in history_messages])
  201. chain = pt | self.llm
  202. try:
  203. response = chain.invoke({
  204. "question": state["question"],
  205. "history_messages": history_str
  206. })
  207. # 更新历史对话
  208. new_question = HumanMessage(content=state["question"])
  209. ai_message = AIMessage(content=response.content)
  210. new_history = history_messages + [new_question] + [ai_message]
  211. return {"answer": response.content, "history_messages": new_history[-100:]}
  212. except Exception as e:
  213. return {"status": "error", "message": str(e)}
  214. def _beyond_ability(self, state: AgentState):
  215. """
  216. 超出能力边界处理节点
  217. :param state:
  218. :return:
  219. """
  220. template = """
  221. 你是交易流水分析助手,对于用户的问题,已超出你的能力边界,组合合理的话术回答用户。
  222. ### 用户问题
  223. {question}
  224. ### 要求
  225. - 友好
  226. - 专业
  227. - 快速
  228. """
  229. pt = ChatPromptTemplate.from_template(template)
  230. chain = pt | self.llm
  231. try:
  232. response = chain.invoke({"question": state["question"]})
  233. return {"answer": response.content}
  234. except Exception as e:
  235. return {"status": "error", "message": str(e)}
  236. def _data_op(self, state: AgentState):
  237. """
  238. 数据验证处理节点
  239. :param state:
  240. :return:
  241. """
  242. return {"answer": "暂不支持"}
  243. def _gen_report(self, state: AgentState):
  244. """
  245. 生成报告节点
  246. :param state:
  247. :return:
  248. """
  249. template = """
  250. """
  251. return {"answer": {"report": {"title": "this is title", "content": "this is content"}}}
  252. def _analysis(self, state: AgentState):
  253. """
  254. 洞察分析处理节点
  255. :param state:
  256. :return:
  257. """
  258. return {"answer": "这是洞察分析结果"}
  259. def _statistics(self, state: AgentState):
  260. """
  261. 统计汇总处理节点
  262. :param state:
  263. :return:
  264. """
  265. return {"answer": "这是统计汇总处理结果"}
  266. def _query(self, state: AgentState):
  267. """
  268. 流水检索处理节点
  269. :param state:
  270. :return:
  271. """
  272. # 创建 react agent( reasoning/acting )
  273. # 工具集,理解数据集结构
  274. tools = [understand_dataset_structure]
  275. system_prompt = f"""
  276. 你是交易流水分析助手,根据用户问题和已有的数据集,检索出对应的数据并返回。
  277. ### 用户问题
  278. {state['question']}
  279. ### 已有数据集
  280. {state['data_set']}
  281. ### 要求
  282. - 首先要理解 数据集结构
  283. - 必须在 已有数据集(已提供) 中进行检索
  284. - 检索出 全部 符合要求的记录
  285. - 如果没有符合要求的记录,则友好提示
  286. """
  287. try:
  288. agent = create_react_agent(model=self.llm, tools=tools, prompt=system_prompt)
  289. response = agent.invoke({
  290. "messages": [
  291. {
  292. "role": "user",
  293. "content": f"""
  294. ### 用户问题
  295. {state['question']}
  296. """
  297. }
  298. ]
  299. })
  300. answer = response["messages"][-1].content
  301. # 解析 LLM 的回答
  302. try:
  303. result = json.loads(answer)
  304. if not result:
  305. answer = "没有找到符合要求的记录"
  306. except json.JSONDecodeError:
  307. answer = "LLM 的回答格式不正确,请重新提问"
  308. logger.info(f"用户问题:{state['question']}, AI回答: {answer}")
  309. return {"answer": answer}
  310. except Exception as e:
  311. logger.error(f"_query 异常:{str(e)}")
  312. return {"status": "error", "message": str(e)}
  313. def _invalid(self, state: AgentState):
  314. """
  315. 无效问题
  316. :param state:
  317. :return:
  318. """
  319. return {"answer": "这是一个无效问题"}
  320. def _knowledge_save(self, state: AgentState):
  321. """
  322. 从用户问题,AI解答等过程中发现知识、存储知识
  323. :param state:
  324. :return:
  325. """
  326. async def process_upload(self, state: AgentState) -> AgentState:
  327. """处理文件上传"""
  328. print("🔍 处理文件上传...")
  329. import os
  330. file_path = state.get("file_path", "")
  331. if not file_path or not os.path.exists(file_path):
  332. state["messages"].append(AIMessage(content="未找到上传的流水文件,请重新上传。"))
  333. return state
  334. try:
  335. # 提取文本
  336. pdf_text = self.pdf_tools.extract_text_from_pdf(file_path)
  337. # 检测银行类型
  338. bank_type = self.pdf_tools.detect_bank_type(pdf_text)
  339. state["bank_type"] = bank_type
  340. # 解析交易数据
  341. if "招商银行" in bank_type:
  342. transactions_json = self.pdf_tools.parse_cmb_statement(pdf_text)
  343. transactions = json.loads(transactions_json)
  344. # 转换为DataFrame
  345. df = pd.DataFrame(transactions)
  346. df['date'] = pd.to_datetime(df['date'])
  347. df = df.sort_values('date')
  348. # 添加分类
  349. df['category'] = df['description'].apply(
  350. lambda x: self._categorize_transaction(x)
  351. )
  352. state["transactions_df"] = df
  353. # 创建分析工具实例
  354. self.analysis_tools = AnalysisTools(df)
  355. summary = self.analysis_tools.get_summary_statistics()
  356. response = f"""
  357. ✅ 文件解析成功!
  358. 📊 基本信息:
  359. - 银行类型:{bank_type}
  360. - 交易笔数:{summary['total_transactions']} 笔
  361. - 时间范围:{summary['date_range']['start']} 至 {summary['date_range']['end']}
  362. - 总收入:¥{summary['total_income']:,.2f}
  363. - 总支出:¥{summary['total_expense']:,.2f}
  364. - 净现金流:¥{summary['net_cash_flow']:,.2f}
  365. 您可以问我:
  366. 1. "分析我的消费模式"
  367. 2. "检测异常交易"
  368. 3. "生成财务报告"
  369. 4. "查询大额交易"
  370. 5. "预测未来余额"
  371. """
  372. state["messages"].append(AIMessage(content=response))
  373. else:
  374. state["messages"].append(AIMessage(
  375. content=f"检测到{bank_type},当前主要支持招商银行格式,其他银行功能正在开发中。"
  376. ))
  377. state["current_step"] = "analysis_ready"
  378. except Exception as e:
  379. state["messages"].append(AIMessage(
  380. content=f"文件解析失败:{str(e)}"
  381. ))
  382. return state
  383. def ask_question(self, session: str, question: str, data_set_file: str = None) -> dict:
  384. """
  385. 程序调用入口,用户提问
  386. :param session: 会话ID
  387. :param question: 用户问题
  388. :param data_set_file: 数据集文档(json数据文件)
  389. :return:
  390. """
  391. data_set = []
  392. # 读取 json 文件数据, 格式为 List[Dict]
  393. import json
  394. import os
  395. try:
  396. logger.info(f"传入的数据文件路径: {data_set_file}")
  397. if data_set_file:
  398. if os.path.exists(data_set_file):
  399. with open(data_set_file, 'r', encoding='utf-8') as file:
  400. data_set = json.load(file)
  401. logger.info(f"加载数据条数:{len(data_set)}")
  402. # 加载数据集
  403. df = pd.DataFrame(data_set)
  404. df['txDate'] = pd.to_datetime(df['txDate']) # 提前转换类型
  405. else:
  406. logger.error(f"数据文件:{data_set_file} 不存在,请检查路径!")
  407. else:
  408. logger.info("未传入数据集文件")
  409. except Exception as e:
  410. logger.error(f"读取数据文件:{data_set_file}异常,{str(e)}")
  411. return {
  412. "status": "error",
  413. "message": f"读取数据文件:{data_set_file}异常,{str(e)}"
  414. }
  415. if not session or not question:
  416. return {
  417. "status": "error",
  418. "message": "缺少参数"
  419. }
  420. result = {
  421. "status": "success"
  422. }
  423. try:
  424. config = {"configurable": {"thread_id": session}}
  425. current_state = self.memory.get(config)
  426. history_messages = current_state["channel_values"]["history_messages"] if current_state else []
  427. if len(data_set) == 0: # 没有指定数据集
  428. data_set = current_state["channel_values"]["data_set"] if current_state else []
  429. # 执行
  430. response = self.graph.invoke({
  431. "session": session,
  432. "question": question,
  433. "data_set": data_set,
  434. "transactions_df": df,
  435. "history_messages": history_messages,
  436. }, config)
  437. result["answer"] = response.get("answer","")
  438. except Exception as e:
  439. print(f"用户对话失败,异常:{str(e)}")
  440. result["status"] = "error"
  441. return result
  442. # ==================== 构建智能体图 ====================
  443. def create_bank_statement_agent():
  444. """创建银行流水分析智能体图"""
  445. # 创建节点
  446. nodes = TxFlowAnalysisAgent()
  447. # 创建状态图
  448. workflow = StateGraph(AgentState)
  449. # 添加节点
  450. workflow.add_node("process_upload", nodes.process_upload)
  451. workflow.add_node("analyze_query", nodes.analyze_query)
  452. workflow.add_node("waiting_for_input", lambda state: state) # 等待输入
  453. # 设置入口点
  454. workflow.set_entry_point("waiting_for_input")
  455. # 添加条件边
  456. workflow.add_conditional_edges(
  457. "waiting_for_input",
  458. nodes.route_query,
  459. {
  460. "process_upload": "process_upload",
  461. "analyze_query": "analyze_query",
  462. "waiting_for_input": "waiting_for_input"
  463. }
  464. )
  465. # 添加普通边
  466. workflow.add_edge("process_upload", "waiting_for_input")
  467. workflow.add_edge("analyze_query", "waiting_for_input")
  468. # 编译图
  469. graph = workflow.compile()
  470. return graph
  471. if __name__ == '__main__':
  472. agent = TxFlowAnalysisAgent()
  473. question = "你好"
  474. # 数据集文件
  475. data_file = "/Applications/work/宇信科技/知识沉淀平台/原始数据-流水分析-农业原始数据.json"
  476. result = agent.ask_question(session="s1", question=question, data_set_file=data_file)
  477. print(f"问题:{question}, 响应:{result}")
  478. question = "查询交易日期是2023-01-05对应的收入记录"
  479. result = agent.ask_question(session="s1", question=question, data_set_file=data_file)
  480. print(f"问题:{question}, 响应:{result}")
  481. question = "查询交易对手是绿源农产品公司的记录"
  482. result = agent.ask_question(session="s1", question=question, data_set_file=data_file)
  483. print(f"问题:{question}, 响应:{result}")
  484. # question = "查找转给贾强的交易记录"
  485. # result = agent.ask_question(session="s1", question=question)
  486. # print(f"问题:{question}, 意图:{result}")
  487. # question = "花了多少钱"
  488. # result = agent.ask_question(session="s1", question=question)
  489. # print(f"问题:{question}, 意图:{result}")
  490. # question = "统计用户总收入"
  491. # result = agent.ask_question(session="s1", question=question)
  492. # print(f"问题:{question}, 意图:{result}")
  493. # question = "生成分析报告"
  494. # result = agent.ask_question(session="s1", question=question)
  495. # print(f"问题:{question}, 意图:{result}")
  496. # question = "转账给张三"
  497. # result = agent.ask_question(session="s1", question=question)
  498. # print(f"问题:{question}, 意图:{result}")
  499. # question = "生成一份异常分析报告"
  500. # result = agent.ask_question(session="s1", question=question)
  501. # print(f"问题:{question}, 意图:{result}")