main.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. import asyncio
  2. import json
  3. import os
  4. import sys
  5. from typing import List, Dict, Any
  6. from llmops.agents.graph import create_report_generation_graph
  7. from llmops.agents.data_manager import DataManager
  8. from llmops.agents.state import create_initial_state
  9. # ============= 数据加载方法 =============
  10. def load_sample_data() -> List[Dict[str, Any]]:
  11. """加载符合完整字段格式的示例数据"""
  12. return [
  13. {
  14. "txId": "TX202512310001",
  15. "txDate": "2025-12-31",
  16. "txTime": "13:55",
  17. "txAmount": 3100,
  18. "txBalance": 245600,
  19. "txDirection": "收入",
  20. "txSummary": "中药材服务费",
  21. "txCounterparty": "康恩贝药业",
  22. "createdAt": "2025-11-30 05:57"
  23. },
  24. {
  25. "txId": "TX202512310002",
  26. "txDate": "2025-12-30",
  27. "txTime": "09:20",
  28. "txAmount": 15000,
  29. "txBalance": 242500,
  30. "txDirection": "收入",
  31. "txSummary": "货款结算",
  32. "txCounterparty": "同仁堂医药",
  33. "createdAt": "2025-11-29 18:23"
  34. },
  35. # ... 更多示例数据
  36. ]
  37. def load_data_from_file(file_path: str) -> List[Dict[str, Any]]:
  38. """
  39. 从JSON文件加载交易流水数据
  40. 参数:
  41. file_path: JSON文件的完整路径
  42. 返回:
  43. 交易流水数据列表
  44. 异常:
  45. FileNotFoundError: 文件不存在
  46. ValueError: 数据格式错误或不符合要求
  47. """
  48. try:
  49. print(f"📁 加载数据文件: {file_path}")
  50. data, df = DataManager.load_from_file(file_path)
  51. # 验证数据质量
  52. is_valid, errors = DataManager.validate_data_schema(data)
  53. if not is_valid:
  54. raise ValueError(f"数据验证失败: {'; '.join(errors)}")
  55. # 打印数据摘要
  56. print(DataManager.format_data_summary(data))
  57. return data
  58. except Exception as e:
  59. raise ValueError(f"数据加载失败: {e}")
  60. def create_sample_file():
  61. """创建符合新格式的示例数据文件"""
  62. sample_file = "sample_transactions.json"
  63. sample_data = load_sample_data()
  64. with open(sample_file, 'w', encoding='utf-8') as f:
  65. json.dump(sample_data, f, ensure_ascii=False, indent=2)
  66. print(f"✅ 示例文件已创建: {sample_file}")
  67. print(f"📍 文件位置: {os.path.abspath(sample_file)}")
  68. async def run_report_generation(
  69. requirement: str,
  70. data: List[Dict[str, Any]],
  71. session_id: str = None
  72. ) -> Dict[str, Any]:
  73. """运行报告生成流程"""
  74. # 验证和格式化数据
  75. print(f"\n🔍 验证数据格式...")
  76. is_valid, errors = DataManager.validate_data_schema(data)
  77. if not is_valid:
  78. raise ValueError(f"数据验证失败: {'; '.join(errors)}")
  79. # 1. 初始化状态 - 关键修复:使用create_initial_state
  80. if session_id is None:
  81. session_id = f"report_session_{hash(requirement) % 10000}"
  82. # 关键修复:使用清理后的初始状态
  83. initial_state = create_initial_state(
  84. question=requirement,
  85. data=data,
  86. session_id=session_id
  87. )
  88. # 2. 创建图
  89. graph = create_report_generation_graph()
  90. # 3. 配置
  91. config = {
  92. "configurable": {"thread_id": initial_state["session_id"]},
  93. "recursion_limit": 50 # 增加到50
  94. }
  95. print(f"\n🚀 开始报告生成流程 (会话: {session_id})...\n")
  96. # 4. 执行并流式打印
  97. step = 1
  98. try:
  99. async for event in graph.astream(initial_state, config, stream_mode="updates"):
  100. node_name = list(event.keys())[0]
  101. node_output = event[node_name]
  102. if "messages" in node_output and node_output["messages"]:
  103. message = node_output["messages"][-1]
  104. if isinstance(message, tuple):
  105. role, content = message
  106. print(f"📍 Step {step:02d} | {node_name}: {content}")
  107. # 添加状态跟踪
  108. if "metrics_requirements" in node_output:
  109. reqs = node_output["metrics_requirements"]
  110. print(f" 📊 指标需求: {[m.metric_id for m in reqs]}")
  111. if "computed_metrics" in node_output:
  112. computed = node_output["computed_metrics"]
  113. print(f" ✅ 已计算: {list(computed.keys())}")
  114. if "pending_metric_ids" in node_output:
  115. pending = node_output["pending_metric_ids"]
  116. print(f" ⏳ 待计算: {pending}")
  117. elif isinstance(message, str):
  118. print(f"📍 Step {step:02d} | {node_name}: {message}")
  119. step += 1
  120. # 安全检查:防止无限循环
  121. if step > 100:
  122. print("⚠️ 达到最大步数限制(100),强制终止")
  123. break
  124. except Exception as e:
  125. print(f"❌ 图执行异常: {e}")
  126. import traceback
  127. traceback.print_exc()
  128. raise
  129. # 5. 等待并获取最终状态
  130. await asyncio.sleep(0.5)
  131. final_state = await graph.aget_state(config)
  132. # 6. 调试信息和错误处理
  133. print(f"\nDebug: 最终状态键 = {list(final_state.values.keys())}")
  134. print(f"Debug: is_complete = {final_state.values.get('is_complete')}")
  135. print(f"Debug: 执行步数 = {step - 1}")
  136. # 检查 answer 是否存在
  137. if "answer" not in final_state.values:
  138. print(f"❌ 错误:最终状态中缺少 'answer' 键")
  139. print(f"大纲状态: {final_state.values.get('outline_draft')}")
  140. print(f"最后路由: {final_state.values.get('next_route')}")
  141. raise ValueError(
  142. f"报告生成未完成:缺少 'answer' 键\n"
  143. f"最后路由: {final_state.values.get('next_route')}\n"
  144. f"大纲草稿: {final_state.values.get('outline_draft') is not None}\n"
  145. f"已计算指标: {len(final_state.values.get('computed_metrics', {}))}"
  146. )
  147. report = final_state.values["answer"]
  148. # 7. 验证报告质量
  149. if isinstance(report, dict) and "error" in report:
  150. raise ValueError(f"报告生成出错: {report['error']}")
  151. return report
  152. def print_usage():
  153. """打印使用说明"""
  154. print("""
  155. 使用说明:
  156. python main.py # 运行主程序(需配置文件路径)
  157. python main.py --create-sample # 创建示例数据文件
  158. python main.py --help # 显示帮助信息
  159. python main.py --validate file.json # 验证数据文件格式
  160. 数据文件要求:
  161. JSON数组,每条记录包含以下字段:
  162. - txId: 交易ID (字符串)
  163. - txDate: 交易日期 (YYYY-MM-DD)
  164. - txTime: 交易时间 (HH:MM)
  165. - txAmount: 交易金额 (数值)
  166. - txBalance: 交易后余额 (数值)
  167. - txDirection: 交易方向 (收入/支出)
  168. - txSummary: 交易摘要 (字符串)
  169. - txCounterparty: 交易对手 (字符串)
  170. - createdAt: 记录创建时间 (YYYY-MM-DD HH:MM:SS)
  171. """)
  172. def validate_file(file_path: str):
  173. """验证数据文件"""
  174. try:
  175. print(f"🔍 验证文件: {file_path}")
  176. data = load_data_from_file(file_path)
  177. print("✅ 文件验证通过!")
  178. return True
  179. except Exception as e:
  180. print(f"❌ 验证失败: {e}")
  181. return False
  182. def handle_command_line():
  183. """处理命令行参数"""
  184. import sys
  185. if len(sys.argv) == 1:
  186. asyncio.run(main())
  187. elif sys.argv[1] == "--create-sample":
  188. create_sample_file()
  189. elif sys.argv[1] == "--help":
  190. print_usage()
  191. elif sys.argv[1] == "--validate" and len(sys.argv) > 2:
  192. validate_file(sys.argv[2])
  193. else:
  194. print("未知参数,使用 --help 查看帮助")
  195. async def main():
  196. """主函数:展示数据加载和报告生成"""
  197. print("=" * 70)
  198. print(" 交易流水报告生成系统 (支持完整字段格式)")
  199. print("=" * 70)
  200. # ===== 配置区 =====
  201. use_sample_data = False # 设为 True 使用示例数据,False 使用真实文件
  202. if use_sample_data:
  203. print("\n📊 使用内置示例数据...")
  204. data = load_sample_data()
  205. requirement = "生成2025年12月交易分析报告,重点关注收入支出分布、主要交易对手、余额变化趋势和大额交易"
  206. else:
  207. # 修改为你的文件路径
  208. file_path = "/Applications/work/宇信科技/知识沉淀平台/原始数据-流水分析-农业原始数据.json"
  209. # 如果文件不存在,创建示例文件
  210. if not os.path.exists(file_path):
  211. print(f"\n⚠️ 文件不存在: {file_path}")
  212. response = input("是否创建示例文件? (y/n): ")
  213. if response.lower() == 'y':
  214. create_sample_file()
  215. file_path = "sample_transactions.json"
  216. else:
  217. print("\n💡 提示: 请将你的JSON文件路径赋值给 file_path 变量")
  218. return
  219. try:
  220. print(f"\n📁 从文件加载数据: {file_path}")
  221. data = load_data_from_file(file_path)
  222. requirement = input("\n请输入报告需求: ") or \
  223. "生成交易分析报告,包含收入趋势、主要客户、大额交易和余额分析"
  224. except Exception as e:
  225. print(f"\n❌ 数据加载失败: {e}")
  226. return
  227. # 运行报告生成
  228. try:
  229. report = await run_report_generation(
  230. requirement=requirement,
  231. data=data,
  232. session_id="demo_session_001"
  233. )
  234. # 打印报告摘要
  235. print("\n" + "=" * 70)
  236. print("📊 最终生成报告")
  237. print("=" * 70)
  238. print(json.dumps(report, ensure_ascii=False, indent=2))
  239. # 保存报告
  240. output_file = "generated_report.json"
  241. with open(output_file, 'w', encoding='utf-8') as f:
  242. json.dump(report, f, ensure_ascii=False, indent=2)
  243. print(f"\n💾 报告已保存到: {os.path.abspath(output_file)}")
  244. except Exception as e:
  245. print(f"\n❌ 报告生成失败: {e}")
  246. import traceback
  247. traceback.print_exc()
  248. if __name__ == "__main__":
  249. handle_command_line()