batch_runner.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. #!/usr/bin/env python3
  2. """
  3. 批量运行器 - 批量执行Complete Agent Flow
  4. ========================================
  5. 此脚本可以批量运行多次完整的智能体工作流,每次运行会创建独立的日志文件夹。
  6. 使用方法:
  7. python batch_runner.py
  8. 配置参数:
  9. - 运行次数: RUNS = 10
  10. - 行业: INDUSTRY = "农业"
  11. - 数据文件: DATA_FILE = "data_files/交易流水样例数据.csv"
  12. - 查询问题: QUESTION = "请生成一份详细的农业经营贷流水分析报告..."
  13. 文件夹结构:
  14. api_results_1/ # 第一次运行的日志
  15. api_results_2/ # 第二次运行的日志
  16. ...
  17. api_results_10/ # 第十次运行的日志
  18. 作者: Big Agent Team
  19. 版本: 1.0.0
  20. 创建时间: 2024-12-20
  21. """
  22. import asyncio
  23. import os
  24. from datetime import datetime
  25. from typing import List, Dict, Any
  26. import sys
  27. import os
  28. # 添加项目根目录到路径,以便导入config
  29. current_dir = os.path.dirname(os.path.abspath(__file__))
  30. parent_dir = os.path.dirname(current_dir)
  31. sys.path.insert(0, parent_dir)
  32. # 根据执行方式选择导入方式
  33. if __name__ == "__main__":
  34. # 直接执行文件时,使用绝对导入
  35. from llmops.complete_agent_flow_rule import run_complete_agent_flow
  36. from llmops.agents.data_manager import DataManager
  37. else:
  38. # 作为模块导入时,使用相对导入
  39. from .complete_agent_flow_rule import run_complete_agent_flow
  40. from .agents.data_manager import DataManager
  41. import config
  42. # ========== 配置参数 ==========
  43. RUNS = 10 # 运行次数
  44. INDUSTRY = "农业" # 行业
  45. DATA_FILE = "data_files/交易流水样例数据.csv" # 数据文件路径
  46. QUESTION = "请生成一份详细的农业经营贷流水分析报告,需要包含:1.总收入和总支出统计 2.收入笔数和支出笔数 3.各类型收入支出占比分析 4.交易对手收入支出TOP3排名 5.按月份的收入支出趋势分析 6.账户数量和交易时间范围统计 7.资金流入流出月度统计等全面指标" # 分析查询
  47. # ==============================
  48. async def run_single_flow(run_id: str, question: str, industry: str, data: List[Dict[str, Any]], file_name: str) -> Dict[str, Any]:
  49. """
  50. 运行单个工作流实例
  51. Args:
  52. run_id: 运行ID
  53. question: 用户查询
  54. industry: 行业
  55. data: 数据集
  56. file_name: 文件名
  57. Returns:
  58. 运行结果
  59. """
  60. print(f"\n{'='*60}")
  61. print(f"🚀 开始运行 #{run_id}")
  62. print(f"📁 日志文件夹: api_results_{run_id}")
  63. print(f"{'='*60}")
  64. # 设置环境变量,让所有agent使用正确的文件夹
  65. os.environ['FLOW_RUN_ID'] = run_id
  66. try:
  67. result = await run_complete_agent_flow(
  68. question=question,
  69. industry=industry,
  70. data=data,
  71. file_name=file_name,
  72. api_key=config.DEEPSEEK_API_KEY,
  73. session_id=f"batch-run-{run_id}"
  74. )
  75. if result.get('success'):
  76. summary = result.get('execution_summary', {})
  77. print(f"✅ 运行 #{run_id} 成功完成")
  78. print(f" 规划步骤: {summary.get('planning_steps', 0)}")
  79. print(f" 指标计算: {summary.get('metrics_computed', 0)}")
  80. else:
  81. print(f"❌ 运行 #{run_id} 失败: {result.get('error', '未知错误')}")
  82. return result
  83. except Exception as e:
  84. print(f"❌ 运行 #{run_id} 发生异常: {e}")
  85. return {
  86. "success": False,
  87. "error": str(e),
  88. "run_id": run_id
  89. }
  90. async def run_batch(runs: int, question: str, industry: str, data_file: str):
  91. """
  92. 批量运行工作流
  93. Args:
  94. runs: 运行次数
  95. question: 用户查询
  96. industry: 行业
  97. data_file: 数据文件路径
  98. """
  99. print("🚀 批量运行器启动")
  100. print(f"📊 计划运行次数: {runs}")
  101. print(f"🏭 行业: {industry}")
  102. print(f"📁 数据文件: {data_file}")
  103. print(f"❓ 查询: {question}")
  104. print(f"{'='*80}")
  105. # 检查API密钥
  106. if not config.DEEPSEEK_API_KEY:
  107. print("❌ 未找到API密钥,请检查config.py")
  108. return
  109. # 加载数据
  110. try:
  111. # 如果是相对路径,从项目根目录查找
  112. if not os.path.isabs(data_file):
  113. data_file = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), data_file)
  114. data = DataManager.load_data_from_csv_file(data_file)
  115. print(f"📊 数据加载成功: {len(data)} 条记录")
  116. except Exception as e:
  117. print(f"❌ 数据加载失败: {e}")
  118. return
  119. # 运行结果统计
  120. successful_runs = 0
  121. failed_runs = 0
  122. results = []
  123. # 运行总时长,秒
  124. total_time = 0
  125. import time
  126. # 逐个运行
  127. for i in range(1, runs + 1):
  128. run_id = str(i)
  129. start_time = time.perf_counter()
  130. # 单次执行
  131. result = await run_single_flow(run_id, question, industry, data, os.path.basename(data_file))
  132. end_time = time.perf_counter()
  133. total_time += (end_time - start_time)
  134. results.append(result)
  135. if result.get('success'):
  136. successful_runs += 1
  137. else:
  138. failed_runs += 1
  139. # 添加短暂延迟,避免API调用过于频繁
  140. if i < runs: # 最后一次不需要延迟
  141. await asyncio.sleep(1)
  142. # 输出统计结果
  143. print(f"\n{'='*80}")
  144. print("📊 批量运行完成统计")
  145. print(f"{'='*80}")
  146. print(f"总运行次数: {runs}")
  147. print(f"总运行总用时: {total_time:.2f}秒,单次用时:{total_time/runs:.2f}秒")
  148. print(f"成功次数: {successful_runs}")
  149. print(f"失败次数: {failed_runs}")
  150. print(f"成功率: {successful_runs/runs*100:.1f}%")
  151. # 显示各运行的日志文件夹
  152. print(f"\n📁 日志文件夹列表:")
  153. for i in range(1, runs + 1):
  154. folder_name = f"api_results_{i}"
  155. status = "✅" if results[i-1].get('success') else "❌"
  156. print(f" {status} {folder_name}")
  157. print("\n🎉 批量运行完成!")
  158. print(f"💡 提示: 每次运行的完整日志保存在对应的 api_results_[数字] 文件夹中")
  159. def main():
  160. """主函数"""
  161. print("🚀 使用配置参数运行批量任务")
  162. print(f"📊 运行次数: {RUNS}")
  163. print(f"🏭 行业: {INDUSTRY}")
  164. print(f"📁 数据文件: {DATA_FILE}")
  165. print(f"❓ 查询: {QUESTION[:50]}...")
  166. print("-" * 80)
  167. # 运行批量任务
  168. asyncio.run(run_batch(
  169. runs=RUNS,
  170. question=QUESTION,
  171. industry=INDUSTRY,
  172. data_file=DATA_FILE
  173. ))
  174. if __name__ == "__main__":
  175. main()