| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213 |
- #!/usr/bin/env python3
- """
- 批量运行器 - 批量执行Complete Agent Flow
- ========================================
- 此脚本可以批量运行多次完整的智能体工作流,每次运行会创建独立的日志文件夹。
- 使用方法:
- python batch_runner.py
- 配置参数:
- - 运行次数: RUNS = 10
- - 行业: INDUSTRY = "农业"
- - 数据文件: DATA_FILE = "data_files/交易流水样例数据.csv"
- - 查询问题: QUESTION = "请生成一份详细的农业经营贷流水分析报告..."
- 文件夹结构:
- api_results_1/ # 第一次运行的日志
- api_results_2/ # 第二次运行的日志
- ...
- api_results_10/ # 第十次运行的日志
- 作者: Big Agent Team
- 版本: 1.0.0
- 创建时间: 2024-12-20
- """
- import asyncio
- import os
- from datetime import datetime
- from typing import List, Dict, Any
- import sys
- import os
- # 添加项目根目录到路径,以便导入config
- current_dir = os.path.dirname(os.path.abspath(__file__))
- parent_dir = os.path.dirname(current_dir)
- sys.path.insert(0, parent_dir)
- # 根据执行方式选择导入方式
- if __name__ == "__main__":
- # 直接执行文件时,使用绝对导入
- from llmops.complete_agent_flow_rule import run_complete_agent_flow
- from llmops.agents.data_manager import DataManager
- else:
- # 作为模块导入时,使用相对导入
- from .complete_agent_flow_rule import run_complete_agent_flow
- from .agents.data_manager import DataManager
- import config
- # ========== 配置参数 ==========
- RUNS = 10 # 运行次数
- INDUSTRY = "农业" # 行业
- DATA_FILE = "data_files/交易流水样例数据.csv" # 数据文件路径
- QUESTION = "请生成一份详细的农业经营贷流水分析报告,需要包含:1.总收入和总支出统计 2.收入笔数和支出笔数 3.各类型收入支出占比分析 4.交易对手收入支出TOP3排名 5.按月份的收入支出趋势分析 6.账户数量和交易时间范围统计 7.资金流入流出月度统计等全面指标" # 分析查询
- # ==============================
- async def run_single_flow(run_id: str, question: str, industry: str, data: List[Dict[str, Any]], file_name: str) -> Dict[str, Any]:
- """
- 运行单个工作流实例
- Args:
- run_id: 运行ID
- question: 用户查询
- industry: 行业
- data: 数据集
- file_name: 文件名
- Returns:
- 运行结果
- """
- print(f"\n{'='*60}")
- print(f"🚀 开始运行 #{run_id}")
- print(f"📁 日志文件夹: api_results_{run_id}")
- print(f"{'='*60}")
- # 设置环境变量,让所有agent使用正确的文件夹
- os.environ['FLOW_RUN_ID'] = run_id
- try:
- result = await run_complete_agent_flow(
- question=question,
- industry=industry,
- data=data,
- file_name=file_name,
- api_key=config.DEEPSEEK_API_KEY,
- session_id=f"batch-run-{run_id}"
- )
- if result.get('success'):
- summary = result.get('execution_summary', {})
- print(f"✅ 运行 #{run_id} 成功完成")
- print(f" 规划步骤: {summary.get('planning_steps', 0)}")
- print(f" 指标计算: {summary.get('metrics_computed', 0)}")
- else:
- print(f"❌ 运行 #{run_id} 失败: {result.get('error', '未知错误')}")
- return result
- except Exception as e:
- print(f"❌ 运行 #{run_id} 发生异常: {e}")
- return {
- "success": False,
- "error": str(e),
- "run_id": run_id
- }
- async def run_batch(runs: int, question: str, industry: str, data_file: str):
- """
- 批量运行工作流
- Args:
- runs: 运行次数
- question: 用户查询
- industry: 行业
- data_file: 数据文件路径
- """
- print("🚀 批量运行器启动")
- print(f"📊 计划运行次数: {runs}")
- print(f"🏭 行业: {industry}")
- print(f"📁 数据文件: {data_file}")
- print(f"❓ 查询: {question}")
- print(f"{'='*80}")
- # 检查API密钥
- if not config.DEEPSEEK_API_KEY:
- print("❌ 未找到API密钥,请检查config.py")
- return
- # 加载数据
- try:
- # 如果是相对路径,从项目根目录查找
- if not os.path.isabs(data_file):
- data_file = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), data_file)
- data = DataManager.load_data_from_csv_file(data_file)
- print(f"📊 数据加载成功: {len(data)} 条记录")
- except Exception as e:
- print(f"❌ 数据加载失败: {e}")
- return
- # 运行结果统计
- successful_runs = 0
- failed_runs = 0
- results = []
- # 运行总时长,秒
- total_time = 0
- import time
- # 逐个运行
- for i in range(1, runs + 1):
- run_id = str(i)
- start_time = time.perf_counter()
- # 单次执行
- result = await run_single_flow(run_id, question, industry, data, os.path.basename(data_file))
- end_time = time.perf_counter()
- total_time += (end_time - start_time)
- results.append(result)
- if result.get('success'):
- successful_runs += 1
- else:
- failed_runs += 1
- # 添加短暂延迟,避免API调用过于频繁
- if i < runs: # 最后一次不需要延迟
- await asyncio.sleep(1)
- # 输出统计结果
- print(f"\n{'='*80}")
- print("📊 批量运行完成统计")
- print(f"{'='*80}")
- print(f"总运行次数: {runs}")
- print(f"总运行总用时: {total_time:.2f}秒,单次用时:{total_time/runs:.2f}秒")
- print(f"成功次数: {successful_runs}")
- print(f"失败次数: {failed_runs}")
- print(f"成功率: {successful_runs/runs*100:.1f}%")
- # 显示各运行的日志文件夹
- print(f"\n📁 日志文件夹列表:")
- for i in range(1, runs + 1):
- folder_name = f"api_results_{i}"
- status = "✅" if results[i-1].get('success') else "❌"
- print(f" {status} {folder_name}")
- print("\n🎉 批量运行完成!")
- print(f"💡 提示: 每次运行的完整日志保存在对应的 api_results_[数字] 文件夹中")
- def main():
- """主函数"""
- print("🚀 使用配置参数运行批量任务")
- print(f"📊 运行次数: {RUNS}")
- print(f"🏭 行业: {INDUSTRY}")
- print(f"📁 数据文件: {DATA_FILE}")
- print(f"❓ 查询: {QUESTION[:50]}...")
- print("-" * 80)
- # 运行批量任务
- asyncio.run(run_batch(
- runs=RUNS,
- question=QUESTION,
- industry=INDUSTRY,
- data_file=DATA_FILE
- ))
- if __name__ == "__main__":
- main()
|