#!/usr/bin/env python3 """ 批量运行器 - 批量执行Complete Agent Flow ======================================== 此脚本可以批量运行多次完整的智能体工作流,每次运行会创建独立的日志文件夹。 使用方法: python batch_runner.py 配置参数: - 运行次数: RUNS = 10 - 行业: INDUSTRY = "农业" - 数据文件: DATA_FILE = "data_files/交易流水样例数据.csv" - 查询问题: QUESTION = "请生成一份详细的农业经营贷流水分析报告..." 文件夹结构: api_results_1/ # 第一次运行的日志 api_results_2/ # 第二次运行的日志 ... api_results_10/ # 第十次运行的日志 作者: Big Agent Team 版本: 1.0.0 创建时间: 2024-12-20 """ import asyncio import os from datetime import datetime from typing import List, Dict, Any import sys import os # 添加项目根目录到路径,以便导入config current_dir = os.path.dirname(os.path.abspath(__file__)) parent_dir = os.path.dirname(current_dir) sys.path.insert(0, parent_dir) # 根据执行方式选择导入方式 if __name__ == "__main__": # 直接执行文件时,使用绝对导入 from llmops.complete_agent_flow_rule import run_complete_agent_flow from llmops.agents.data_manager import DataManager else: # 作为模块导入时,使用相对导入 from .complete_agent_flow_rule import run_complete_agent_flow from .agents.data_manager import DataManager import config # ========== 配置参数 ========== RUNS = 2 # 运行次数 INDUSTRY = "农业" # 行业 DATA_FILE = "data_files/交易流水样例数据.csv" # 数据文件路径 QUESTION = "请生成一份详细的农业经营贷流水分析报告,需要包含:1.总收入和总支出统计 2.收入笔数和支出笔数 3.各类型收入支出占比分析 4.交易对手收入支出TOP3排名 5.按月份的收入支出趋势分析 6.账户数量和交易时间范围统计 7.资金流入流出月度统计等全面指标" # 分析查询 # ============================== async def run_single_flow(run_id: str, question: str, industry: str, data: List[Dict[str, Any]], file_name: str) -> Dict[str, Any]: """ 运行单个工作流实例 Args: run_id: 运行ID question: 用户查询 industry: 行业 data: 数据集 file_name: 文件名 Returns: 运行结果 """ print(f"\n{'='*60}") print(f"🚀 开始运行 #{run_id}") print(f"📁 日志文件夹: api_results_{run_id}") print(f"{'='*60}") # 设置环境变量,让所有agent使用正确的文件夹 os.environ['FLOW_RUN_ID'] = run_id try: result = await run_complete_agent_flow( question=question, industry=industry, data=data, file_name=file_name, api_key=config.DEEPSEEK_API_KEY, session_id=f"batch-run-{run_id}" ) if result.get('success'): summary = result.get('execution_summary', {}) print(f"✅ 运行 #{run_id} 成功完成") print(f" 规划步骤: {summary.get('planning_steps', 0)}") print(f" 指标计算: {summary.get('metrics_computed', 0)}") else: print(f"❌ 运行 #{run_id} 失败: {result.get('error', '未知错误')}") return result except Exception as e: print(f"❌ 运行 #{run_id} 发生异常: {e}") return { "success": False, "error": str(e), "run_id": run_id } async def run_batch(runs: int, question: str, industry: str, data_file: str): """ 批量运行工作流 Args: runs: 运行次数 question: 用户查询 industry: 行业 data_file: 数据文件路径 """ print("🚀 批量运行器启动") print(f"📊 计划运行次数: {runs}") print(f"🏭 行业: {industry}") print(f"📁 数据文件: {data_file}") print(f"❓ 查询: {question}") print(f"{'='*80}") # 检查API密钥 if not config.DEEPSEEK_API_KEY: print("❌ 未找到API密钥,请检查config.py") return # 加载数据 try: # 如果是相对路径,从项目根目录查找 if not os.path.isabs(data_file): data_file = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), data_file) data = DataManager.load_data_from_csv_file(data_file) print(f"📊 数据加载成功: {len(data)} 条记录") except Exception as e: print(f"❌ 数据加载失败: {e}") return # 运行结果统计 successful_runs = 0 failed_runs = 0 results = [] # 逐个运行 for i in range(1, runs + 1): run_id = str(i) result = await run_single_flow(run_id, question, industry, data, os.path.basename(data_file)) results.append(result) if result.get('success'): successful_runs += 1 else: failed_runs += 1 # 添加短暂延迟,避免API调用过于频繁 if i < runs: # 最后一次不需要延迟 await asyncio.sleep(1) # 输出统计结果 print(f"\n{'='*80}") print("📊 批量运行完成统计") print(f"{'='*80}") print(f"总运行次数: {runs}") print(f"成功次数: {successful_runs}") print(f"失败次数: {failed_runs}") print(f"成功率: {successful_runs/runs*100:.1f}%") # 显示各运行的日志文件夹 print(f"\n📁 日志文件夹列表:") for i in range(1, runs + 1): folder_name = f"api_results_{i}" status = "✅" if results[i-1].get('success') else "❌" print(f" {status} {folder_name}") print("\n🎉 批量运行完成!") print(f"💡 提示: 每次运行的完整日志保存在对应的 api_results_[数字] 文件夹中") def main(): """主函数""" print("🚀 使用配置参数运行批量任务") print(f"📊 运行次数: {RUNS}") print(f"🏭 行业: {INDUSTRY}") print(f"📁 数据文件: {DATA_FILE}") print(f"❓ 查询: {QUESTION[:50]}...") print("-" * 80) # 运行批量任务 asyncio.run(run_batch( runs=RUNS, question=QUESTION, industry=INDUSTRY, data_file=DATA_FILE )) if __name__ == "__main__": main()