| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309 |
- import asyncio
- import json
- import os
- import sys
- from typing import List, Dict, Any
- from llmops.agents.graph import create_report_generation_graph
- from llmops.agents.data_manager import DataManager
- from llmops.agents.state import create_initial_state
- # ============= 数据加载方法 =============
- def load_sample_data() -> List[Dict[str, Any]]:
- """加载符合完整字段格式的示例数据"""
- return [
- {
- "txId": "TX202512310001",
- "txDate": "2025-12-31",
- "txTime": "13:55",
- "txAmount": 3100,
- "txBalance": 245600,
- "txDirection": "收入",
- "txSummary": "中药材服务费",
- "txCounterparty": "康恩贝药业",
- "createdAt": "2025-11-30 05:57"
- },
- {
- "txId": "TX202512310002",
- "txDate": "2025-12-30",
- "txTime": "09:20",
- "txAmount": 15000,
- "txBalance": 242500,
- "txDirection": "收入",
- "txSummary": "货款结算",
- "txCounterparty": "同仁堂医药",
- "createdAt": "2025-11-29 18:23"
- },
- # ... 更多示例数据
- ]
- def load_data_from_file(file_path: str) -> List[Dict[str, Any]]:
- """
- 从JSON文件加载交易流水数据
- 参数:
- file_path: JSON文件的完整路径
- 返回:
- 交易流水数据列表
- 异常:
- FileNotFoundError: 文件不存在
- ValueError: 数据格式错误或不符合要求
- """
- try:
- print(f"📁 加载数据文件: {file_path}")
- data, df = DataManager.load_from_file(file_path)
- # 验证数据质量
- is_valid, errors = DataManager.validate_data_schema(data)
- if not is_valid:
- raise ValueError(f"数据验证失败: {'; '.join(errors)}")
- # 打印数据摘要
- print(DataManager.format_data_summary(data))
- return data
- except Exception as e:
- raise ValueError(f"数据加载失败: {e}")
- def create_sample_file():
- """创建符合新格式的示例数据文件"""
- sample_file = "sample_transactions.json"
- sample_data = load_sample_data()
- with open(sample_file, 'w', encoding='utf-8') as f:
- json.dump(sample_data, f, ensure_ascii=False, indent=2)
- print(f"✅ 示例文件已创建: {sample_file}")
- print(f"📍 文件位置: {os.path.abspath(sample_file)}")
- async def run_report_generation(
- requirement: str,
- data: List[Dict[str, Any]],
- session_id: str = None
- ) -> Dict[str, Any]:
- """运行报告生成流程"""
- # 验证和格式化数据
- print(f"\n🔍 验证数据格式...")
- is_valid, errors = DataManager.validate_data_schema(data)
- if not is_valid:
- raise ValueError(f"数据验证失败: {'; '.join(errors)}")
- # 1. 初始化状态 - 关键修复:使用create_initial_state
- if session_id is None:
- session_id = f"report_session_{hash(requirement) % 10000}"
- # 关键修复:使用清理后的初始状态
- initial_state = create_initial_state(
- question=requirement,
- data=data,
- session_id=session_id
- )
- # 2. 创建图
- graph = create_report_generation_graph()
- # 3. 配置
- config = {
- "configurable": {"thread_id": initial_state["session_id"]},
- "recursion_limit": 50 # 增加到50
- }
- print(f"\n🚀 开始报告生成流程 (会话: {session_id})...\n")
- # 4. 执行并流式打印
- step = 1
- try:
- async for event in graph.astream(initial_state, config, stream_mode="updates"):
- node_name = list(event.keys())[0]
- node_output = event[node_name]
- if "messages" in node_output and node_output["messages"]:
- message = node_output["messages"][-1]
- if isinstance(message, tuple):
- role, content = message
- print(f"📍 Step {step:02d} | {node_name}: {content}")
- # 添加状态跟踪
- if "metrics_requirements" in node_output:
- reqs = node_output["metrics_requirements"]
- print(f" 📊 指标需求: {[m.metric_id for m in reqs]}")
- if "computed_metrics" in node_output:
- computed = node_output["computed_metrics"]
- print(f" ✅ 已计算: {list(computed.keys())}")
- if "pending_metric_ids" in node_output:
- pending = node_output["pending_metric_ids"]
- print(f" ⏳ 待计算: {pending}")
- elif isinstance(message, str):
- print(f"📍 Step {step:02d} | {node_name}: {message}")
- step += 1
- # 安全检查:防止无限循环
- if step > 100:
- print("⚠️ 达到最大步数限制(100),强制终止")
- break
- except Exception as e:
- print(f"❌ 图执行异常: {e}")
- import traceback
- traceback.print_exc()
- raise
- # 5. 等待并获取最终状态
- await asyncio.sleep(0.5)
- final_state = await graph.aget_state(config)
- # 6. 调试信息和错误处理
- print(f"\nDebug: 最终状态键 = {list(final_state.values.keys())}")
- print(f"Debug: is_complete = {final_state.values.get('is_complete')}")
- print(f"Debug: 执行步数 = {step - 1}")
- # 检查 answer 是否存在
- if "answer" not in final_state.values:
- print(f"❌ 错误:最终状态中缺少 'answer' 键")
- print(f"大纲状态: {final_state.values.get('outline_draft')}")
- print(f"最后路由: {final_state.values.get('next_route')}")
- raise ValueError(
- f"报告生成未完成:缺少 'answer' 键\n"
- f"最后路由: {final_state.values.get('next_route')}\n"
- f"大纲草稿: {final_state.values.get('outline_draft') is not None}\n"
- f"已计算指标: {len(final_state.values.get('computed_metrics', {}))}"
- )
- report = final_state.values["answer"]
- # 7. 验证报告质量
- if isinstance(report, dict) and "error" in report:
- raise ValueError(f"报告生成出错: {report['error']}")
- return report
- def print_usage():
- """打印使用说明"""
- print("""
- 使用说明:
- python main.py # 运行主程序(需配置文件路径)
- python main.py --create-sample # 创建示例数据文件
- python main.py --help # 显示帮助信息
- python main.py --validate file.json # 验证数据文件格式
- 数据文件要求:
- JSON数组,每条记录包含以下字段:
- - txId: 交易ID (字符串)
- - txDate: 交易日期 (YYYY-MM-DD)
- - txTime: 交易时间 (HH:MM)
- - txAmount: 交易金额 (数值)
- - txBalance: 交易后余额 (数值)
- - txDirection: 交易方向 (收入/支出)
- - txSummary: 交易摘要 (字符串)
- - txCounterparty: 交易对手 (字符串)
- - createdAt: 记录创建时间 (YYYY-MM-DD HH:MM:SS)
- """)
- def validate_file(file_path: str):
- """验证数据文件"""
- try:
- print(f"🔍 验证文件: {file_path}")
- data = load_data_from_file(file_path)
- print("✅ 文件验证通过!")
- return True
- except Exception as e:
- print(f"❌ 验证失败: {e}")
- return False
- def handle_command_line():
- """处理命令行参数"""
- import sys
- if len(sys.argv) == 1:
- asyncio.run(main())
- elif sys.argv[1] == "--create-sample":
- create_sample_file()
- elif sys.argv[1] == "--help":
- print_usage()
- elif sys.argv[1] == "--validate" and len(sys.argv) > 2:
- validate_file(sys.argv[2])
- else:
- print("未知参数,使用 --help 查看帮助")
- async def main():
- """主函数:展示数据加载和报告生成"""
- print("=" * 70)
- print(" 交易流水报告生成系统 (支持完整字段格式)")
- print("=" * 70)
- # ===== 配置区 =====
- use_sample_data = False # 设为 True 使用示例数据,False 使用真实文件
- if use_sample_data:
- print("\n📊 使用内置示例数据...")
- data = load_sample_data()
- requirement = "生成2025年12月交易分析报告,重点关注收入支出分布、主要交易对手、余额变化趋势和大额交易"
- else:
- # 修改为你的文件路径
- file_path = "/Applications/work/宇信科技/知识沉淀平台/原始数据-流水分析-农业原始数据.json"
- # 如果文件不存在,创建示例文件
- if not os.path.exists(file_path):
- print(f"\n⚠️ 文件不存在: {file_path}")
- response = input("是否创建示例文件? (y/n): ")
- if response.lower() == 'y':
- create_sample_file()
- file_path = "sample_transactions.json"
- else:
- print("\n💡 提示: 请将你的JSON文件路径赋值给 file_path 变量")
- return
- try:
- print(f"\n📁 从文件加载数据: {file_path}")
- data = load_data_from_file(file_path)
- requirement = input("\n请输入报告需求: ") or \
- "生成交易分析报告,包含收入趋势、主要客户、大额交易和余额分析"
- except Exception as e:
- print(f"\n❌ 数据加载失败: {e}")
- return
- # 运行报告生成
- try:
- report = await run_report_generation(
- requirement=requirement,
- data=data,
- session_id="demo_session_001"
- )
- # 打印报告摘要
- print("\n" + "=" * 70)
- print("📊 最终生成报告")
- print("=" * 70)
- print(json.dumps(report, ensure_ascii=False, indent=2))
- # 保存报告
- output_file = "generated_report.json"
- with open(output_file, 'w', encoding='utf-8') as f:
- json.dump(report, f, ensure_ascii=False, indent=2)
- print(f"\n💾 报告已保存到: {os.path.abspath(output_file)}")
- except Exception as e:
- print(f"\n❌ 报告生成失败: {e}")
- import traceback
- traceback.print_exc()
- if __name__ == "__main__":
- handle_command_line()
|