advanced_agent.py 12 KB


  1. #!/usr/bin/env python3
  2. """
  3. 高级Agent示例 - 结构化输出与错误处理
  4. ===================================
  5. 这个文件展示了高级Agent功能,包含:
  6. 1. Pydantic数据模型
  7. 2. 结构化JSON输出
  8. 3. 错误处理与重试
  9. 4. 结果验证
  10. 5. 日志记录
  11. 运行方法:
  12. python examples/advanced_agent.py
  13. """
  14. import os
  15. import sys
  16. import json
  17. from typing import Dict, Any, List, Optional
  18. from datetime import datetime
  19. from dotenv import load_dotenv
  20. # 加载环境变量
  21. load_dotenv()
  22. try:
  23. from langchain_openai import ChatOpenAI
  24. from langchain_core.prompts import ChatPromptTemplate
  25. from langchain_core.output_parsers import JsonOutputParser
  26. from pydantic import BaseModel, Field, ValidationError
  27. except ImportError as e:
  28. print(f"❌ 缺少依赖包: {e}")
  29. print("请运行: pip install langchain langchain-openai pydantic python-dotenv")
  30. sys.exit(1)
  31. class AnalysisMetrics(BaseModel):
  32. """分析指标数据模型"""
  33. total_records: int = Field(description="总记录数")
  34. valid_records: int = Field(description="有效记录数")
  35. invalid_records: int = Field(description="无效记录数")
  36. completeness_rate: float = Field(description="完整性比率", ge=0, le=1)
  37. unique_values: int = Field(description="唯一值数量")
  38. class DataQualityReport(BaseModel):
  39. """数据质量报告"""
  40. dataset_name: str = Field(description="数据集名称")
  41. analysis_date: str = Field(description="分析日期")
  42. overall_score: float = Field(description="整体质量评分", ge=0, le=1)
  43. metrics: AnalysisMetrics = Field(description="详细指标")
  44. recommendations: List[str] = Field(description="改进建议")
  45. warnings: List[str] = Field(description="警告信息")
  46. class AdvancedAgent:
  47. """高级Agent - 支持结构化输出和错误处理"""
  48. def __init__(self, max_retries: int = 3):
  49. """初始化高级Agent"""
  50. api_key = os.getenv('DEEPSEEK_API_KEY')
  51. if not api_key:
  52. raise ValueError("请在.env文件中设置DEEPSEEK_API_KEY")
  53. # 初始化LLM
  54. self.llm = ChatOpenAI(
  55. model="deepseek-chat",
  56. api_key=api_key,
  57. base_url="https://api.deepseek.com",
  58. temperature=0.1
  59. )
  60. self.max_retries = max_retries
  61. self.call_history = []
  62. print("✅ AdvancedAgent初始化完成")
  63. def create_quality_analysis_prompt(self) -> ChatPromptTemplate:
  64. """创建数据质量分析提示词"""
  65. parser = JsonOutputParser(pydantic_object=DataQualityReport)
  66. template = """你是一个专业的数据质量分析师,请分析提供的数据集并生成详细的质量报告。
  67. 数据集信息:
  68. 名称: {dataset_name}
  69. 记录数量: {record_count}
  70. 数据样例: {data_sample}
  71. 请按以下JSON格式输出分析报告:
  72. {format_instructions}
  73. 要求:
  74. 1. 计算总记录数、有效记录数、无效记录数
  75. 2. 评估数据完整性(0-1之间的分数)
  76. 3. 识别唯一值数量
  77. 4. 给出整体质量评分(0-1之间)
  78. 5. 提供至少2条改进建议
  79. 6. 如果发现问题,请在warnings中列出
  80. 确保所有数值字段都是数字类型,字符串字段是字符串类型。"""
  81. return ChatPromptTemplate.from_template(
  82. template,
  83. partial_variables={"format_instructions": parser.get_format_instructions()}
  84. )
  85. def analyze_data_quality(self, dataset_name: str, data: List[Dict[str, Any]]) -> Dict[str, Any]:
  86. """
  87. 分析数据质量
  88. Args:
  89. dataset_name: 数据集名称
  90. data: 数据列表
  91. Returns:
  92. 分析结果
  93. """
  94. start_time = datetime.now()
  95. try:
  96. # 准备数据样例
  97. data_sample = json.dumps(data[:3], ensure_ascii=False, indent=2) if data else "无数据"
  98. # 创建提示词
  99. prompt = self.create_quality_analysis_prompt()
  100. chain = prompt | self.llm | JsonOutputParser(pydantic_object=DataQualityReport)
  101. # 执行分析(带重试机制)
  102. result = None
  103. last_error = None
  104. for attempt in range(self.max_retries):
  105. try:
  106. print(f"🔍 执行数据质量分析 (尝试 {attempt + 1}/{self.max_retries})")
  107. raw_result = chain.invoke({
  108. "dataset_name": dataset_name,
  109. "record_count": len(data),
  110. "data_sample": data_sample
  111. })
  112. # 验证和转换结果
  113. result = DataQualityReport(**raw_result)
  114. break
  115. except (ValidationError, json.JSONDecodeError) as e:
  116. last_error = f"解析错误: {str(e)}"
  117. print(f"⚠️ 尝试 {attempt + 1} 失败: {last_error}")
  118. if attempt < self.max_retries - 1:
  119. continue
  120. except Exception as e:
  121. last_error = f"执行错误: {str(e)}"
  122. print(f"❌ 尝试 {attempt + 1} 失败: {last_error}")
  123. if attempt < self.max_retries - 1:
  124. continue
  125. # 记录调用历史
  126. end_time = datetime.now()
  127. call_record = {
  128. "timestamp": end_time.isoformat(),
  129. "duration": (end_time - start_time).total_seconds(),
  130. "function": "analyze_data_quality",
  131. "dataset": dataset_name,
  132. "success": result is not None,
  133. "attempts": attempt + 1 if 'attempt' in locals() else 1,
  134. "error": last_error if result is None else None
  135. }
  136. self.call_history.append(call_record)
  137. if result:
  138. print("✅ 数据质量分析完成")
  139. return {
  140. "success": True,
  141. "result": result.dict(),
  142. "call_info": call_record
  143. }
  144. else:
  145. print(f"❌ 数据质量分析失败: {last_error}")
  146. return {
  147. "success": False,
  148. "error": last_error,
  149. "call_info": call_record
  150. }
  151. except Exception as e:
  152. end_time = datetime.now()
  153. error_msg = f"意外错误: {str(e)}"
  154. call_record = {
  155. "timestamp": end_time.isoformat(),
  156. "duration": (end_time - start_time).total_seconds(),
  157. "function": "analyze_data_quality",
  158. "dataset": dataset_name,
  159. "success": False,
  160. "attempts": 1,
  161. "error": error_msg
  162. }
  163. self.call_history.append(call_record)
  164. print(f"❌ 数据质量分析异常: {error_msg}")
  165. return {
  166. "success": False,
  167. "error": error_msg,
  168. "call_info": call_record
  169. }
  170. def generate_summary_report(self) -> Dict[str, Any]:
  171. """生成调用历史摘要报告"""
  172. if not self.call_history:
  173. return {"message": "暂无调用历史"}
  174. total_calls = len(self.call_history)
  175. successful_calls = sum(1 for call in self.call_history if call["success"])
  176. failed_calls = total_calls - successful_calls
  177. total_duration = sum(call["duration"] for call in self.call_history)
  178. avg_duration = total_duration / total_calls if total_calls > 0 else 0
  179. return {
  180. "total_calls": total_calls,
  181. "successful_calls": successful_calls,
  182. "failed_calls": failed_calls,
  183. "success_rate": successful_calls / total_calls if total_calls > 0 else 0,
  184. "total_duration": round(total_duration, 2),
  185. "average_duration": round(avg_duration, 2),
  186. "call_history": self.call_history[-5:] # 最近5次调用
  187. }
  188. def create_sample_data() -> List[Dict[str, Any]]:
  189. """创建示例数据"""
  190. return [
  191. {
  192. "id": 1,
  193. "name": "张三",
  194. "age": 25,
  195. "city": "北京",
  196. "salary": 5000,
  197. "department": "技术部"
  198. },
  199. {
  200. "id": 2,
  201. "name": "李四",
  202. "age": 30,
  203. "city": "上海",
  204. "salary": 6000,
  205. "department": "销售部"
  206. },
  207. {
  208. "id": 3,
  209. "name": "王五",
  210. "age": None, # 缺失数据
  211. "city": "广州",
  212. "salary": None, # 缺失数据
  213. "department": "技术部"
  214. },
  215. {
  216. "id": 4,
  217. "name": "赵六",
  218. "age": 35,
  219. "city": "深圳",
  220. "salary": 7000,
  221. "department": "财务部"
  222. },
  223. {
  224. "id": 5,
  225. "name": "张三", # 重复数据
  226. "age": 25,
  227. "city": "北京",
  228. "salary": 5000,
  229. "department": "技术部"
  230. }
  231. ]
  232. def main():
  233. """主函数 - 演示高级Agent功能"""
  234. print("🚀 高级Agent示例 - 结构化输出与错误处理")
  235. print("=" * 60)
  236. try:
  237. # 创建Agent实例
  238. agent = AdvancedAgent(max_retries=2)
  239. # 准备测试数据
  240. sample_data = create_sample_data()
  241. print(f"\n🧪 测试数据:")
  242. print(f"数据集: 示例员工数据")
  243. print(f"记录数: {len(sample_data)}")
  244. print(f"数据样例: {json.dumps(sample_data[0], ensure_ascii=False, indent=2)}")
  245. # 执行数据质量分析
  246. print("\n🔍 开始数据质量分析...")
  247. result = agent.analyze_data_quality("员工数据集", sample_data)
  248. if result["success"]:
  249. analysis_result = result["result"]
  250. print("\n✅ 分析结果:")
  251. print(f"整体质量评分: {analysis_result['overall_score']:.2f}")
  252. print(f"完整性比率: {analysis_result['metrics']['completeness_rate']:.2f}")
  253. print(f"唯一值数量: {analysis_result['metrics']['unique_values']}")
  254. print(f"\n📋 改进建议:")
  255. for i, rec in enumerate(analysis_result['recommendations'][:3], 1):
  256. print(f"{i}. {rec}")
  257. if analysis_result['warnings']:
  258. print(f"\n⚠️ 警告信息:")
  259. for warning in analysis_result['warnings'][:2]:
  260. print(f"• {warning}")
  261. else:
  262. print(f"❌ 分析失败: {result['error']}")
  263. # 显示调用历史摘要
  264. print("\n📊 调用历史摘要:")
  265. summary = agent.generate_summary_report()
  266. print(f"总调用次数: {summary['total_calls']}")
  267. print(f"成功率: {summary['success_rate']:.1%}")
  268. print(f"平均耗时: {summary['average_duration']:.2f}秒")
  269. print("\n🎉 高级Agent示例完成!")
  270. print("\n💡 学习要点:")
  271. print("1. Pydantic数据模型: 使用BaseModel定义结构化数据")
  272. print("2. 输出解析器: JsonOutputParser自动解析JSON输出")
  273. print("3. 错误处理: 捕获ValidationError和网络异常")
  274. print("4. 重试机制: 自动重试失败的请求")
  275. print("5. 调用跟踪: 记录所有API调用的历史")
  276. print("6. 结果验证: 使用Pydantic验证输出格式")
  277. print("\n📚 下一步学习:")
  278. print("- 查看项目中的实际Agent代码")
  279. print("- 学习PRACTICE_GUIDE.md中的Phase 4内容")
  280. print("- 尝试修改示例代码,添加新功能")
  281. except Exception as e:
  282. print(f"❌ 运行出错: {e}")
  283. print("\n🔧 故障排除:")
  284. print("1. 检查.env文件中的API密钥")
  285. print("2. 确认网络连接正常")
  286. print("3. 检查pydantic版本: pip show pydantic")
  287. if __name__ == "__main__":
  288. main()