multi_agent.py 14 KB


  1. #!/usr/bin/env python3
  2. """
  3. 多Agent协作示例 - Agent间通信与任务分配
  4. =====================================
  5. 这个文件展示了多个Agent如何协作完成复杂任务,包含:
  6. 1. 任务分解与分配
  7. 2. Agent间结果传递
  8. 3. 结果聚合与整合
  9. 4. 协作工作流设计
  10. 运行方法:
  11. python examples/multi_agent.py
  12. """
  13. import os
  14. import sys
  15. import asyncio
  16. from typing import Dict, Any, List, Optional
  17. from datetime import datetime
  18. from dotenv import load_dotenv
  19. # 加载环境变量
  20. load_dotenv()
  21. try:
  22. from langchain_openai import ChatOpenAI
  23. from langchain_core.prompts import ChatPromptTemplate
  24. except ImportError as e:
  25. print(f"❌ 缺少依赖包: {e}")
  26. print("请运行: pip install langchain langchain-openai python-dotenv")
  27. sys.exit(1)
  28. class TaskResult:
  29. """任务结果类"""
  30. def __init__(self, agent_name: str, task_name: str, success: bool,
  31. result: Any = None, error: str = None):
  32. self.agent_name = agent_name
  33. self.task_name = task_name
  34. self.success = success
  35. self.result = result
  36. self.error = error
  37. self.timestamp = datetime.now()
  38. def to_dict(self) -> Dict[str, Any]:
  39. return {
  40. "agent": self.agent_name,
  41. "task": self.task_name,
  42. "success": self.success,
  43. "result": self.result,
  44. "error": self.error,
  45. "timestamp": self.timestamp.isoformat()
  46. }
  47. class BaseAgent:
  48. """基础Agent类"""
  49. def __init__(self, name: str, specialty: str):
  50. self.name = name
  51. self.specialty = specialty
  52. api_key = os.getenv('DEEPSEEK_API_KEY')
  53. if not api_key:
  54. raise ValueError("请在.env文件中设置DEEPSEEK_API_KEY")
  55. self.llm = ChatOpenAI(
  56. model="deepseek-chat",
  57. api_key=api_key,
  58. base_url="https://api.deepseek.com",
  59. temperature=0.1
  60. )
  61. self.completed_tasks = []
  62. async def execute_task(self, task_name: str, **kwargs) -> TaskResult:
  63. """执行任务的通用方法"""
  64. raise NotImplementedError("子类必须实现execute_task方法")
  65. def record_task(self, task_result: TaskResult):
  66. """记录完成的任务"""
  67. self.completed_tasks.append(task_result)
  68. class DataAnalyzerAgent(BaseAgent):
  69. """数据分析Agent"""
  70. def __init__(self):
  71. super().__init__("DataAnalyzer", "数据分析与统计")
  72. self.analysis_prompt = ChatPromptTemplate.from_messages([
  73. ("system", "你是一个专业的数据分析师,擅长发现数据中的模式和趋势。"),
  74. ("user", "请分析以下数据:\n\n{data}\n\n请提供关键发现和趋势分析。")
  75. ])
  76. async def execute_task(self, task_name: str, **kwargs) -> TaskResult:
  77. try:
  78. data = kwargs.get("data", "")
  79. chain = self.analysis_prompt | self.llm
  80. result = await chain.ainvoke({"data": data})
  81. task_result = TaskResult(
  82. agent_name=self.name,
  83. task_name=task_name,
  84. success=True,
  85. result=result.content
  86. )
  87. self.record_task(task_result)
  88. return task_result
  89. except Exception as e:
  90. task_result = TaskResult(
  91. agent_name=self.name,
  92. task_name=task_name,
  93. success=False,
  94. error=str(e)
  95. )
  96. self.record_task(task_result)
  97. return task_result
  98. class ReportGeneratorAgent(BaseAgent):
  99. """报告生成Agent"""
  100. def __init__(self):
  101. super().__init__("ReportGenerator", "报告撰写与格式化")
  102. self.report_prompt = ChatPromptTemplate.from_messages([
  103. ("system", "你是一个专业的报告撰写专家,擅长将分析结果整理成清晰的报告。"),
  104. ("user", "基于以下分析结果生成一份完整的分析报告:\n\n{analysis_result}\n\n请包含:执行摘要、详细分析、结论和建议。")
  105. ])
  106. async def execute_task(self, task_name: str, **kwargs) -> TaskResult:
  107. try:
  108. analysis_result = kwargs.get("analysis_result", "")
  109. chain = self.report_prompt | self.llm
  110. result = await chain.ainvoke({"analysis_result": analysis_result})
  111. task_result = TaskResult(
  112. agent_name=self.name,
  113. task_name=task_name,
  114. success=True,
  115. result=result.content
  116. )
  117. self.record_task(task_result)
  118. return task_result
  119. except Exception as e:
  120. task_result = TaskResult(
  121. agent_name=self.name,
  122. task_name=task_name,
  123. success=False,
  124. error=str(e)
  125. )
  126. self.record_task(task_result)
  127. return task_result
  128. class QualityCheckerAgent(BaseAgent):
  129. """质量检查Agent"""
  130. def __init__(self):
  131. super().__init__("QualityChecker", "质量评估与验证")
  132. self.quality_prompt = ChatPromptTemplate.from_messages([
  133. ("system", "你是一个严格的质量检查专家,负责评估分析结果的质量和准确性。"),
  134. ("user", "请检查以下分析报告的质量:\n\n{report}\n\n请评估:1)准确性 2)完整性 3)清晰度 4)实用性。给出评分(1-10)和改进建议。")
  135. ])
  136. async def execute_task(self, task_name: str, **kwargs) -> TaskResult:
  137. try:
  138. report = kwargs.get("report", "")
  139. chain = self.quality_prompt | self.llm
  140. result = await chain.ainvoke({"report": report})
  141. task_result = TaskResult(
  142. agent_name=self.name,
  143. task_name=task_name,
  144. success=True,
  145. result=result.content
  146. )
  147. self.record_task(task_result)
  148. return task_result
  149. except Exception as e:
  150. task_result = TaskResult(
  151. agent_name=self.name,
  152. task_name=task_name,
  153. success=False,
  154. error=str(e)
  155. )
  156. self.record_task(task_result)
  157. return task_result
  158. class MultiAgentSystem:
  159. """多Agent协作系统"""
  160. def __init__(self):
  161. self.agents = {
  162. "analyzer": DataAnalyzerAgent(),
  163. "reporter": ReportGeneratorAgent(),
  164. "checker": QualityCheckerAgent()
  165. }
  166. self.workflow_history = []
  167. def get_agent(self, agent_type: str) -> BaseAgent:
  168. """获取指定类型的Agent"""
  169. return self.agents.get(agent_type)
  170. async def execute_workflow(self, data: str) -> Dict[str, Any]:
  171. """
  172. 执行完整的工作流:
  173. 1. 数据分析
  174. 2. 报告生成
  175. 3. 质量检查
  176. """
  177. print("🚀 开始多Agent协作工作流")
  178. print(f"📊 输入数据: {data[:50]}...")
  179. workflow_start = datetime.now()
  180. results = {}
  181. try:
  182. # 步骤1: 数据分析
  183. print("\n1️⃣ 执行数据分析...")
  184. analyzer = self.get_agent("analyzer")
  185. analysis_result = await analyzer.execute_task("data_analysis", data=data)
  186. results["analysis"] = analysis_result.to_dict()
  187. if not analysis_result.success:
  188. raise Exception(f"数据分析失败: {analysis_result.error}")
  189. print("✅ 数据分析完成")
  190. # 步骤2: 报告生成
  191. print("\n2️⃣ 生成分析报告...")
  192. reporter = self.get_agent("reporter")
  193. report_result = await reporter.execute_task(
  194. "report_generation",
  195. analysis_result=analysis_result.result
  196. )
  197. results["report"] = report_result.to_dict()
  198. if not report_result.success:
  199. raise Exception(f"报告生成失败: {report_result.error}")
  200. print("✅ 报告生成完成")
  201. # 步骤3: 质量检查
  202. print("\n3️⃣ 执行质量检查...")
  203. checker = self.get_agent("checker")
  204. quality_result = await checker.execute_task(
  205. "quality_check",
  206. report=report_result.result
  207. )
  208. results["quality"] = quality_result.to_dict()
  209. if not quality_result.success:
  210. raise Exception(f"质量检查失败: {quality_result.error}")
  211. print("✅ 质量检查完成")
  212. # 记录成功的工作流
  213. workflow_end = datetime.now()
  214. workflow_record = {
  215. "success": True,
  216. "start_time": workflow_start.isoformat(),
  217. "end_time": workflow_end.isoformat(),
  218. "duration": (workflow_end - workflow_start).total_seconds(),
  219. "steps_completed": 3,
  220. "results": results
  221. }
  222. self.workflow_history.append(workflow_record)
  223. print(f"\n🎉 工作流执行成功!总耗时: {workflow_record['duration']:.2f}秒")
  224. return {
  225. "success": True,
  226. "workflow": workflow_record,
  227. "final_report": report_result.result,
  228. "quality_assessment": quality_result.result
  229. }
  230. except Exception as e:
  231. workflow_end = datetime.now()
  232. error_msg = str(e)
  233. workflow_record = {
  234. "success": False,
  235. "start_time": workflow_start.isoformat(),
  236. "end_time": workflow_end.isoformat(),
  237. "duration": (workflow_end - workflow_start).total_seconds(),
  238. "error": error_msg,
  239. "results": results
  240. }
  241. self.workflow_history.append(workflow_record)
  242. print(f"\n❌ 工作流执行失败: {error_msg}")
  243. return {
  244. "success": False,
  245. "error": error_msg,
  246. "workflow": workflow_record
  247. }
  248. def get_system_status(self) -> Dict[str, Any]:
  249. """获取系统状态"""
  250. agent_status = {}
  251. for agent_type, agent in self.agents.items():
  252. agent_status[agent_type] = {
  253. "name": agent.name,
  254. "specialty": agent.specialty,
  255. "tasks_completed": len(agent.completed_tasks)
  256. }
  257. return {
  258. "agents": agent_status,
  259. "total_workflows": len(self.workflow_history),
  260. "successful_workflows": sum(1 for w in self.workflow_history if w["success"]),
  261. "failed_workflows": sum(1 for w in self.workflow_history if not w["success"])
  262. }
  263. def create_sample_data() -> str:
  264. """创建示例数据"""
  265. data = """
  266. 销售数据分析:
  267. - 2024年第一季度总销售额:150万元
  268. - 各月销售额:1月45万、2月50万、3月55万
  269. - 主要产品:A产品(40%)、B产品(35%)、C产品(25%)
  270. - 客户数量:新增客户120个,回头客80个
  271. - 地区分布:华北35%、华东30%、华南20%、其他15%
  272. 趋势观察:
  273. - 销售额逐月上升,增长率约11%
  274. - A产品销售占比略有下降
  275. - 新客户获取率稳步提升
  276. """
  277. return data.strip()
  278. async def main():
  279. """主函数 - 演示多Agent协作"""
  280. print("🚀 多Agent协作示例 - Agent间通信与任务分配")
  281. print("=" * 70)
  282. try:
  283. # 创建多Agent系统
  284. system = MultiAgentSystem()
  285. # 显示系统状态
  286. status = system.get_system_status()
  287. print("🤖 系统初始化完成")
  288. print(f"📊 可用Agent: {len(status['agents'])}个")
  289. for agent_type, info in status['agents'].items():
  290. print(f" • {info['name']}: {info['specialty']}")
  291. # 准备测试数据
  292. sample_data = create_sample_data()
  293. print("\n📋 测试数据:")
  294. print(sample_data)
  295. print("-" * 50)
  296. # 执行协作工作流
  297. result = await system.execute_workflow(sample_data)
  298. if result["success"]:
  299. print("\n📄 最终分析报告:")
  300. report_preview = result["final_report"][:300] + "..." if len(result["final_report"]) > 300 else result["final_report"]
  301. print(report_preview)
  302. print("\n⭐ 质量评估:")
  303. quality_preview = result["quality_assessment"][:200] + "..." if len(result["quality_assessment"]) > 200 else result["quality_assessment"]
  304. print(quality_preview)
  305. # 显示系统最终状态
  306. final_status = system.get_system_status()
  307. print("\n📊 执行统计:")
  308. print(f"总工作流数: {final_status['total_workflows']}")
  309. print(f"成功执行: {final_status['successful_workflows']}")
  310. print(f"失败执行: {final_status['failed_workflows']}")
  311. for agent_type, info in final_status['agents'].items():
  312. print(f"{info['name']}完成任务数: {info['tasks_completed']}")
  313. else:
  314. print(f"❌ 执行失败: {result['error']}")
  315. print("\n🎉 多Agent协作示例完成!")
  316. print("\n💡 多Agent协作学习要点:")
  317. print("1. 任务分解: 将复杂任务拆分为多个专门步骤")
  318. print("2. Agent分工: 每个Agent负责特定的专业任务")
  319. print("3. 结果传递: Agent间通过TaskResult传递数据")
  320. print("4. 错误处理: 任何一个环节失败都会终止整个流程")
  321. print("5. 状态跟踪: 记录每个Agent的执行历史")
  322. print("6. 协作编排: 设计合理的执行顺序和依赖关系")
  323. print("\n📚 下一步学习:")
  324. print("- 查看项目中的CompleteAgentFlow实现")
  325. print("- 学习PRACTICE_GUIDE.md中的Phase 4和5")
  326. print("- 尝试添加新的Agent类型到协作系统中")
  327. except Exception as e:
  328. print(f"❌ 运行出错: {e}")
  329. print("\n🔧 故障排除:")
  330. print("1. 检查.env文件中的API密钥")
  331. print("2. 确认网络连接正常")
  332. print("3. 检查Agent初始化是否成功")
  333. if __name__ == "__main__":
  334. asyncio.run(main())