||
- #!/usr/bin/env python3
- """
- 多Agent协作示例 - Agent间通信与任务分配
- =====================================
- 这个文件展示了多个Agent如何协作完成复杂任务,包含:
- 1. 任务分解与分配
- 2. Agent间结果传递
- 3. 结果聚合与整合
- 4. 协作工作流设计
- 运行方法:
- python examples/multi_agent.py
- """
- import os
- import sys
- import asyncio
- from typing import Dict, Any, List, Optional
- from datetime import datetime
- from dotenv import load_dotenv
- # 加载环境变量
- load_dotenv()
- try:
- from langchain_openai import ChatOpenAI
- from langchain_core.prompts import ChatPromptTemplate
- except ImportError as e:
- print(f"❌ 缺少依赖包: {e}")
- print("请运行: pip install langchain langchain-openai python-dotenv")
- sys.exit(1)
- class TaskResult:
- """任务结果类"""
- def __init__(self, agent_name: str, task_name: str, success: bool,
- result: Any = None, error: str = None):
- self.agent_name = agent_name
- self.task_name = task_name
- self.success = success
- self.result = result
- self.error = error
- self.timestamp = datetime.now()
- def to_dict(self) -> Dict[str, Any]:
- return {
- "agent": self.agent_name,
- "task": self.task_name,
- "success": self.success,
- "result": self.result,
- "error": self.error,
- "timestamp": self.timestamp.isoformat()
- }
- class BaseAgent:
- """基础Agent类"""
- def __init__(self, name: str, specialty: str):
- self.name = name
- self.specialty = specialty
- api_key = os.getenv('DEEPSEEK_API_KEY')
- if not api_key:
- raise ValueError("请在.env文件中设置DEEPSEEK_API_KEY")
- self.llm = ChatOpenAI(
- model="deepseek-chat",
- api_key=api_key,
- base_url="https://api.deepseek.com",
- temperature=0.1
- )
- self.completed_tasks = []
- async def execute_task(self, task_name: str, **kwargs) -> TaskResult:
- """执行任务的通用方法"""
- raise NotImplementedError("子类必须实现execute_task方法")
- def record_task(self, task_result: TaskResult):
- """记录完成的任务"""
- self.completed_tasks.append(task_result)
- class DataAnalyzerAgent(BaseAgent):
- """数据分析Agent"""
- def __init__(self):
- super().__init__("DataAnalyzer", "数据分析与统计")
- self.analysis_prompt = ChatPromptTemplate.from_messages([
- ("system", "你是一个专业的数据分析师,擅长发现数据中的模式和趋势。"),
- ("user", "请分析以下数据:\n\n{data}\n\n请提供关键发现和趋势分析。")
- ])
- async def execute_task(self, task_name: str, **kwargs) -> TaskResult:
- try:
- data = kwargs.get("data", "")
- chain = self.analysis_prompt | self.llm
- result = await chain.ainvoke({"data": data})
- task_result = TaskResult(
- agent_name=self.name,
- task_name=task_name,
- success=True,
- result=result.content
- )
- self.record_task(task_result)
- return task_result
- except Exception as e:
- task_result = TaskResult(
- agent_name=self.name,
- task_name=task_name,
- success=False,
- error=str(e)
- )
- self.record_task(task_result)
- return task_result
- class ReportGeneratorAgent(BaseAgent):
- """报告生成Agent"""
- def __init__(self):
- super().__init__("ReportGenerator", "报告撰写与格式化")
- self.report_prompt = ChatPromptTemplate.from_messages([
- ("system", "你是一个专业的报告撰写专家,擅长将分析结果整理成清晰的报告。"),
- ("user", "基于以下分析结果生成一份完整的分析报告:\n\n{analysis_result}\n\n请包含:执行摘要、详细分析、结论和建议。")
- ])
- async def execute_task(self, task_name: str, **kwargs) -> TaskResult:
- try:
- analysis_result = kwargs.get("analysis_result", "")
- chain = self.report_prompt | self.llm
- result = await chain.ainvoke({"analysis_result": analysis_result})
- task_result = TaskResult(
- agent_name=self.name,
- task_name=task_name,
- success=True,
- result=result.content
- )
- self.record_task(task_result)
- return task_result
- except Exception as e:
- task_result = TaskResult(
- agent_name=self.name,
- task_name=task_name,
- success=False,
- error=str(e)
- )
- self.record_task(task_result)
- return task_result
- class QualityCheckerAgent(BaseAgent):
- """质量检查Agent"""
- def __init__(self):
- super().__init__("QualityChecker", "质量评估与验证")
- self.quality_prompt = ChatPromptTemplate.from_messages([
- ("system", "你是一个严格的质量检查专家,负责评估分析结果的质量和准确性。"),
- ("user", "请检查以下分析报告的质量:\n\n{report}\n\n请评估:1)准确性 2)完整性 3)清晰度 4)实用性。给出评分(1-10)和改进建议。")
- ])
- async def execute_task(self, task_name: str, **kwargs) -> TaskResult:
- try:
- report = kwargs.get("report", "")
- chain = self.quality_prompt | self.llm
- result = await chain.ainvoke({"report": report})
- task_result = TaskResult(
- agent_name=self.name,
- task_name=task_name,
- success=True,
- result=result.content
- )
- self.record_task(task_result)
- return task_result
- except Exception as e:
- task_result = TaskResult(
- agent_name=self.name,
- task_name=task_name,
- success=False,
- error=str(e)
- )
- self.record_task(task_result)
- return task_result
- class MultiAgentSystem:
- """多Agent协作系统"""
- def __init__(self):
- self.agents = {
- "analyzer": DataAnalyzerAgent(),
- "reporter": ReportGeneratorAgent(),
- "checker": QualityCheckerAgent()
- }
- self.workflow_history = []
- def get_agent(self, agent_type: str) -> BaseAgent:
- """获取指定类型的Agent"""
- return self.agents.get(agent_type)
- async def execute_workflow(self, data: str) -> Dict[str, Any]:
- """
- 执行完整的工作流:
- 1. 数据分析
- 2. 报告生成
- 3. 质量检查
- """
- print("🚀 开始多Agent协作工作流")
- print(f"📊 输入数据: {data[:50]}...")
- workflow_start = datetime.now()
- results = {}
- try:
- # 步骤1: 数据分析
- print("\n1️⃣ 执行数据分析...")
- analyzer = self.get_agent("analyzer")
- analysis_result = await analyzer.execute_task("data_analysis", data=data)
- results["analysis"] = analysis_result.to_dict()
- if not analysis_result.success:
- raise Exception(f"数据分析失败: {analysis_result.error}")
- print("✅ 数据分析完成")
- # 步骤2: 报告生成
- print("\n2️⃣ 生成分析报告...")
- reporter = self.get_agent("reporter")
- report_result = await reporter.execute_task(
- "report_generation",
- analysis_result=analysis_result.result
- )
- results["report"] = report_result.to_dict()
- if not report_result.success:
- raise Exception(f"报告生成失败: {report_result.error}")
- print("✅ 报告生成完成")
- # 步骤3: 质量检查
- print("\n3️⃣ 执行质量检查...")
- checker = self.get_agent("checker")
- quality_result = await checker.execute_task(
- "quality_check",
- report=report_result.result
- )
- results["quality"] = quality_result.to_dict()
- if not quality_result.success:
- raise Exception(f"质量检查失败: {quality_result.error}")
- print("✅ 质量检查完成")
- # 记录成功的工作流
- workflow_end = datetime.now()
- workflow_record = {
- "success": True,
- "start_time": workflow_start.isoformat(),
- "end_time": workflow_end.isoformat(),
- "duration": (workflow_end - workflow_start).total_seconds(),
- "steps_completed": 3,
- "results": results
- }
- self.workflow_history.append(workflow_record)
- print(f"\n🎉 工作流执行成功!总耗时: {workflow_record['duration']:.2f}秒")
- return {
- "success": True,
- "workflow": workflow_record,
- "final_report": report_result.result,
- "quality_assessment": quality_result.result
- }
- except Exception as e:
- workflow_end = datetime.now()
- error_msg = str(e)
- workflow_record = {
- "success": False,
- "start_time": workflow_start.isoformat(),
- "end_time": workflow_end.isoformat(),
- "duration": (workflow_end - workflow_start).total_seconds(),
- "error": error_msg,
- "results": results
- }
- self.workflow_history.append(workflow_record)
- print(f"\n❌ 工作流执行失败: {error_msg}")
- return {
- "success": False,
- "error": error_msg,
- "workflow": workflow_record
- }
- def get_system_status(self) -> Dict[str, Any]:
- """获取系统状态"""
- agent_status = {}
- for agent_type, agent in self.agents.items():
- agent_status[agent_type] = {
- "name": agent.name,
- "specialty": agent.specialty,
- "tasks_completed": len(agent.completed_tasks)
- }
- return {
- "agents": agent_status,
- "total_workflows": len(self.workflow_history),
- "successful_workflows": sum(1 for w in self.workflow_history if w["success"]),
- "failed_workflows": sum(1 for w in self.workflow_history if not w["success"])
- }
- def create_sample_data() -> str:
- """创建示例数据"""
- data = """
- 销售数据分析:
- - 2024年第一季度总销售额:150万元
- - 各月销售额:1月45万、2月50万、3月55万
- - 主要产品:A产品(40%)、B产品(35%)、C产品(25%)
- - 客户数量:新增客户120个,回头客80个
- - 地区分布:华北35%、华东30%、华南20%、其他15%
- 趋势观察:
- - 销售额逐月上升,增长率约11%
- - A产品销售占比略有下降
- - 新客户获取率稳步提升
- """
- return data.strip()
- async def main():
- """主函数 - 演示多Agent协作"""
- print("🚀 多Agent协作示例 - Agent间通信与任务分配")
- print("=" * 70)
- try:
- # 创建多Agent系统
- system = MultiAgentSystem()
- # 显示系统状态
- status = system.get_system_status()
- print("🤖 系统初始化完成")
- print(f"📊 可用Agent: {len(status['agents'])}个")
- for agent_type, info in status['agents'].items():
- print(f" • {info['name']}: {info['specialty']}")
- # 准备测试数据
- sample_data = create_sample_data()
- print("\n📋 测试数据:")
- print(sample_data)
- print("-" * 50)
- # 执行协作工作流
- result = await system.execute_workflow(sample_data)
- if result["success"]:
- print("\n📄 最终分析报告:")
- report_preview = result["final_report"][:300] + "..." if len(result["final_report"]) > 300 else result["final_report"]
- print(report_preview)
- print("\n⭐ 质量评估:")
- quality_preview = result["quality_assessment"][:200] + "..." if len(result["quality_assessment"]) > 200 else result["quality_assessment"]
- print(quality_preview)
- # 显示系统最终状态
- final_status = system.get_system_status()
- print("\n📊 执行统计:")
- print(f"总工作流数: {final_status['total_workflows']}")
- print(f"成功执行: {final_status['successful_workflows']}")
- print(f"失败执行: {final_status['failed_workflows']}")
- for agent_type, info in final_status['agents'].items():
- print(f"{info['name']}完成任务数: {info['tasks_completed']}")
- else:
- print(f"❌ 执行失败: {result['error']}")
- print("\n🎉 多Agent协作示例完成!")
- print("\n💡 多Agent协作学习要点:")
- print("1. 任务分解: 将复杂任务拆分为多个专门步骤")
- print("2. Agent分工: 每个Agent负责特定的专业任务")
- print("3. 结果传递: Agent间通过TaskResult传递数据")
- print("4. 错误处理: 任何一个环节失败都会终止整个流程")
- print("5. 状态跟踪: 记录每个Agent的执行历史")
- print("6. 协作编排: 设计合理的执行顺序和依赖关系")
- print("\n📚 下一步学习:")
- print("- 查看项目中的CompleteAgentFlow实现")
- print("- 学习PRACTICE_GUIDE.md中的Phase 4和5")
- print("- 尝试添加新的Agent类型到协作系统中")
- except Exception as e:
- print(f"❌ 运行出错: {e}")
- print("\n🔧 故障排除:")
- print("1. 检查.env文件中的API密钥")
- print("2. 确认网络连接正常")
- print("3. 检查Agent初始化是否成功")
- if __name__ == "__main__":
- asyncio.run(main())
|