batch_runner.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. #!/usr/bin/env python3
  2. """
  3. 批量运行器 - 批量执行Complete Agent Flow
  4. ========================================
  5. 此脚本可以批量运行多次完整的智能体工作流,每次运行会创建独立的日志文件夹。
  6. 使用方法:
  7. python batch_runner.py
  8. 配置参数:
  9. - 运行次数: RUNS = 10
  10. - 行业: INDUSTRY = "农业"
  11. - 数据文件: DATA_FILE = "data_files/交易流水样例数据.csv"
  12. - 查询问题: QUESTION = "请生成一份详细的农业经营贷流水分析报告..."
  13. 文件夹结构:
  14. api_results_1/ # 第一次运行的日志
  15. api_results_2/ # 第二次运行的日志
  16. ...
  17. api_results_10/ # 第十次运行的日志
  18. 作者: Big Agent Team
  19. 版本: 1.0.0
  20. 创建时间: 2024-12-20
  21. """
  22. import asyncio
  23. import os
  24. from datetime import datetime
  25. from typing import List, Dict, Any
  26. import sys
  27. import os
  28. # 添加项目根目录到路径,以便导入config
  29. current_dir = os.path.dirname(os.path.abspath(__file__))
  30. parent_dir = os.path.dirname(current_dir)
  31. sys.path.insert(0, parent_dir)
  32. # 根据执行方式选择导入方式
  33. if __name__ == "__main__":
  34. # 直接执行文件时,使用绝对导入
  35. from llmops.complete_agent_flow_rule import run_complete_agent_flow
  36. from llmops.agents.data_manager import DataManager
  37. else:
  38. # 作为模块导入时,使用相对导入
  39. from .complete_agent_flow_rule import run_complete_agent_flow
  40. from .agents.data_manager import DataManager
  41. import config
  42. # ========== 配置参数 ==========
  43. RUNS = 2 # 运行次数
  44. INDUSTRY = "农业" # 行业
  45. DATA_FILE = "data_files/交易流水样例数据.csv" # 数据文件路径
  46. QUESTION = "请生成一份详细的农业经营贷流水分析报告,需要包含:1.总收入和总支出统计 2.收入笔数和支出笔数 3.各类型收入支出占比分析 4.交易对手收入支出TOP3排名 5.按月份的收入支出趋势分析 6.账户数量和交易时间范围统计 7.资金流入流出月度统计等全面指标" # 分析查询
  47. # ==============================
  48. async def run_single_flow(run_id: str, question: str, industry: str, data: List[Dict[str, Any]], file_name: str) -> Dict[str, Any]:
  49. """
  50. 运行单个工作流实例
  51. Args:
  52. run_id: 运行ID
  53. question: 用户查询
  54. industry: 行业
  55. data: 数据集
  56. file_name: 文件名
  57. Returns:
  58. 运行结果
  59. """
  60. print(f"\n{'='*60}")
  61. print(f"🚀 开始运行 #{run_id}")
  62. print(f"📁 日志文件夹: api_results_{run_id}")
  63. print(f"{'='*60}")
  64. try:
  65. result = await run_complete_agent_flow(
  66. question=question,
  67. industry=industry,
  68. data=data,
  69. file_name=file_name,
  70. api_key=config.DEEPSEEK_API_KEY,
  71. session_id=f"batch-run-{run_id}",
  72. run_id=run_id
  73. )
  74. if result.get('success'):
  75. summary = result.get('execution_summary', {})
  76. print(f"✅ 运行 #{run_id} 成功完成")
  77. print(f" 规划步骤: {summary.get('planning_steps', 0)}")
  78. print(f" 指标计算: {summary.get('metrics_computed', 0)}")
  79. else:
  80. print(f"❌ 运行 #{run_id} 失败: {result.get('error', '未知错误')}")
  81. return result
  82. except Exception as e:
  83. print(f"❌ 运行 #{run_id} 发生异常: {e}")
  84. return {
  85. "success": False,
  86. "error": str(e),
  87. "run_id": run_id
  88. }
  89. async def run_batch(runs: int, question: str, industry: str, data_file: str):
  90. """
  91. 批量运行工作流
  92. Args:
  93. runs: 运行次数
  94. question: 用户查询
  95. industry: 行业
  96. data_file: 数据文件路径
  97. """
  98. print("🚀 批量运行器启动")
  99. print(f"📊 计划运行次数: {runs}")
  100. print(f"🏭 行业: {industry}")
  101. print(f"📁 数据文件: {data_file}")
  102. print(f"❓ 查询: {question}")
  103. print(f"{'='*80}")
  104. # 检查API密钥
  105. if not config.DEEPSEEK_API_KEY:
  106. print("❌ 未找到API密钥,请检查config.py")
  107. return
  108. # 加载数据
  109. try:
  110. # 如果是相对路径,从项目根目录查找
  111. if not os.path.isabs(data_file):
  112. data_file = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), data_file)
  113. data = DataManager.load_data_from_csv_file(data_file)
  114. print(f"📊 数据加载成功: {len(data)} 条记录")
  115. except Exception as e:
  116. print(f"❌ 数据加载失败: {e}")
  117. return
  118. # 运行结果统计
  119. successful_runs = 0
  120. failed_runs = 0
  121. results = []
  122. # 逐个运行
  123. for i in range(1, runs + 1):
  124. run_id = str(i)
  125. result = await run_single_flow(run_id, question, industry, data, os.path.basename(data_file))
  126. results.append(result)
  127. if result.get('success'):
  128. successful_runs += 1
  129. else:
  130. failed_runs += 1
  131. # 添加短暂延迟,避免API调用过于频繁
  132. if i < runs: # 最后一次不需要延迟
  133. await asyncio.sleep(1)
  134. # 输出统计结果
  135. print(f"\n{'='*80}")
  136. print("📊 批量运行完成统计")
  137. print(f"{'='*80}")
  138. print(f"总运行次数: {runs}")
  139. print(f"成功次数: {successful_runs}")
  140. print(f"失败次数: {failed_runs}")
  141. print(f"成功率: {successful_runs/runs*100:.1f}%")
  142. # 显示各运行的日志文件夹
  143. print(f"\n📁 日志文件夹列表:")
  144. for i in range(1, runs + 1):
  145. folder_name = f"api_results_{i}"
  146. status = "✅" if results[i-1].get('success') else "❌"
  147. print(f" {status} {folder_name}")
  148. print("\n🎉 批量运行完成!")
  149. print(f"💡 提示: 每次运行的完整日志保存在对应的 api_results_[数字] 文件夹中")
  150. def main():
  151. """主函数"""
  152. print("🚀 使用配置参数运行批量任务")
  153. print(f"📊 运行次数: {RUNS}")
  154. print(f"🏭 行业: {INDUSTRY}")
  155. print(f"📁 数据文件: {DATA_FILE}")
  156. print(f"❓ 查询: {QUESTION[:50]}...")
  157. print("-" * 80)
  158. # 运行批量任务
  159. asyncio.run(run_batch(
  160. runs=RUNS,
  161. question=QUESTION,
  162. industry=INDUSTRY,
  163. data_file=DATA_FILE
  164. ))
  165. if __name__ == "__main__":
  166. main()