import asyncio import json import os import sys from typing import List, Dict, Any from llmops.agents.graph import create_report_generation_graph from llmops.agents.data_manager import DataManager from llmops.agents.state import create_initial_state # ============= 数据加载方法 ============= def load_sample_data() -> List[Dict[str, Any]]: """加载符合完整字段格式的示例数据""" return [ { "txId": "TX202512310001", "txDate": "2025-12-31", "txTime": "13:55", "txAmount": 3100, "txBalance": 245600, "txDirection": "收入", "txSummary": "中药材服务费", "txCounterparty": "康恩贝药业", "createdAt": "2025-11-30 05:57" }, { "txId": "TX202512310002", "txDate": "2025-12-30", "txTime": "09:20", "txAmount": 15000, "txBalance": 242500, "txDirection": "收入", "txSummary": "货款结算", "txCounterparty": "同仁堂医药", "createdAt": "2025-11-29 18:23" }, # ... 更多示例数据 ] def load_data_from_file(file_path: str) -> List[Dict[str, Any]]: """ 从JSON文件加载交易流水数据 参数: file_path: JSON文件的完整路径 返回: 交易流水数据列表 异常: FileNotFoundError: 文件不存在 ValueError: 数据格式错误或不符合要求 """ try: print(f"📁 加载数据文件: {file_path}") data, df = DataManager.load_from_file(file_path) # 验证数据质量 is_valid, errors = DataManager.validate_data_schema(data) if not is_valid: raise ValueError(f"数据验证失败: {'; '.join(errors)}") # 打印数据摘要 print(DataManager.format_data_summary(data)) return data except Exception as e: raise ValueError(f"数据加载失败: {e}") def create_sample_file(): """创建符合新格式的示例数据文件""" sample_file = "sample_transactions.json" sample_data = load_sample_data() with open(sample_file, 'w', encoding='utf-8') as f: json.dump(sample_data, f, ensure_ascii=False, indent=2) print(f"✅ 示例文件已创建: {sample_file}") print(f"📍 文件位置: {os.path.abspath(sample_file)}") async def run_report_generation( requirement: str, data: List[Dict[str, Any]], session_id: str = None ) -> Dict[str, Any]: """运行报告生成流程""" # 验证和格式化数据 print(f"\n🔍 验证数据格式...") is_valid, errors = DataManager.validate_data_schema(data) if not is_valid: raise ValueError(f"数据验证失败: {'; '.join(errors)}") # 1. 初始化状态 - 关键修复:使用create_initial_state if session_id is None: session_id = f"report_session_{hash(requirement) % 10000}" # 关键修复:使用清理后的初始状态 initial_state = create_initial_state( question=requirement, data=data, session_id=session_id ) # 2. 创建图 graph = create_report_generation_graph() # 3. 配置 config = { "configurable": {"thread_id": initial_state["session_id"]}, "recursion_limit": 50 # 增加到50 } print(f"\n🚀 开始报告生成流程 (会话: {session_id})...\n") # 4. 执行并流式打印 step = 1 try: async for event in graph.astream(initial_state, config, stream_mode="updates"): node_name = list(event.keys())[0] node_output = event[node_name] if "messages" in node_output and node_output["messages"]: message = node_output["messages"][-1] if isinstance(message, tuple): role, content = message print(f"📍 Step {step:02d} | {node_name}: {content}") # 添加状态跟踪 if "metrics_requirements" in node_output: reqs = node_output["metrics_requirements"] print(f" 📊 指标需求: {[m.metric_id for m in reqs]}") if "computed_metrics" in node_output: computed = node_output["computed_metrics"] print(f" ✅ 已计算: {list(computed.keys())}") if "pending_metric_ids" in node_output: pending = node_output["pending_metric_ids"] print(f" ⏳ 待计算: {pending}") elif isinstance(message, str): print(f"📍 Step {step:02d} | {node_name}: {message}") step += 1 # 安全检查:防止无限循环 if step > 100: print("⚠️ 达到最大步数限制(100),强制终止") break except Exception as e: print(f"❌ 图执行异常: {e}") import traceback traceback.print_exc() raise # 5. 等待并获取最终状态 await asyncio.sleep(0.5) final_state = await graph.aget_state(config) # 6. 调试信息和错误处理 print(f"\nDebug: 最终状态键 = {list(final_state.values.keys())}") print(f"Debug: is_complete = {final_state.values.get('is_complete')}") print(f"Debug: 执行步数 = {step - 1}") # 检查 answer 是否存在 if "answer" not in final_state.values: print(f"❌ 错误:最终状态中缺少 'answer' 键") print(f"大纲状态: {final_state.values.get('outline_draft')}") print(f"最后路由: {final_state.values.get('next_route')}") raise ValueError( f"报告生成未完成:缺少 'answer' 键\n" f"最后路由: {final_state.values.get('next_route')}\n" f"大纲草稿: {final_state.values.get('outline_draft') is not None}\n" f"已计算指标: {len(final_state.values.get('computed_metrics', {}))}" ) report = final_state.values["answer"] # 7. 验证报告质量 if isinstance(report, dict) and "error" in report: raise ValueError(f"报告生成出错: {report['error']}") return report def print_usage(): """打印使用说明""" print(""" 使用说明: python main.py # 运行主程序(需配置文件路径) python main.py --create-sample # 创建示例数据文件 python main.py --help # 显示帮助信息 python main.py --validate file.json # 验证数据文件格式 数据文件要求: JSON数组,每条记录包含以下字段: - txId: 交易ID (字符串) - txDate: 交易日期 (YYYY-MM-DD) - txTime: 交易时间 (HH:MM) - txAmount: 交易金额 (数值) - txBalance: 交易后余额 (数值) - txDirection: 交易方向 (收入/支出) - txSummary: 交易摘要 (字符串) - txCounterparty: 交易对手 (字符串) - createdAt: 记录创建时间 (YYYY-MM-DD HH:MM:SS) """) def validate_file(file_path: str): """验证数据文件""" try: print(f"🔍 验证文件: {file_path}") data = load_data_from_file(file_path) print("✅ 文件验证通过!") return True except Exception as e: print(f"❌ 验证失败: {e}") return False def handle_command_line(): """处理命令行参数""" import sys if len(sys.argv) == 1: asyncio.run(main()) elif sys.argv[1] == "--create-sample": create_sample_file() elif sys.argv[1] == "--help": print_usage() elif sys.argv[1] == "--validate" and len(sys.argv) > 2: validate_file(sys.argv[2]) else: print("未知参数,使用 --help 查看帮助") async def main(): """主函数:展示数据加载和报告生成""" print("=" * 70) print(" 交易流水报告生成系统 (支持完整字段格式)") print("=" * 70) # ===== 配置区 ===== use_sample_data = False # 设为 True 使用示例数据,False 使用真实文件 if use_sample_data: print("\n📊 使用内置示例数据...") data = load_sample_data() requirement = "生成2025年12月交易分析报告,重点关注收入支出分布、主要交易对手、余额变化趋势和大额交易" else: # 修改为你的文件路径 file_path = "/Applications/work/宇信科技/知识沉淀平台/原始数据-流水分析-农业原始数据.json" # 如果文件不存在,创建示例文件 if not os.path.exists(file_path): print(f"\n⚠️ 文件不存在: {file_path}") response = input("是否创建示例文件? (y/n): ") if response.lower() == 'y': create_sample_file() file_path = "sample_transactions.json" else: print("\n💡 提示: 请将你的JSON文件路径赋值给 file_path 变量") return try: print(f"\n📁 从文件加载数据: {file_path}") data = load_data_from_file(file_path) requirement = input("\n请输入报告需求: ") or \ "生成交易分析报告,包含收入趋势、主要客户、大额交易和余额分析" except Exception as e: print(f"\n❌ 数据加载失败: {e}") return # 运行报告生成 try: report = await run_report_generation( requirement=requirement, data=data, session_id="demo_session_001" ) # 打印报告摘要 print("\n" + "=" * 70) print("📊 最终生成报告") print("=" * 70) print(json.dumps(report, ensure_ascii=False, indent=2)) # 保存报告 output_file = "generated_report.json" with open(output_file, 'w', encoding='utf-8') as f: json.dump(report, f, ensure_ascii=False, indent=2) print(f"\n💾 报告已保存到: {os.path.abspath(output_file)}") except Exception as e: print(f"\n❌ 报告生成失败: {e}") import traceback traceback.print_exc() if __name__ == "__main__": handle_command_line()