#!/usr/bin/env python3 """ 高级Agent示例 - 结构化输出与错误处理 =================================== 这个文件展示了高级Agent功能,包含: 1. Pydantic数据模型 2. 结构化JSON输出 3. 错误处理与重试 4. 结果验证 5. 日志记录 运行方法: python examples/advanced_agent.py """ import os import sys import json from typing import Dict, Any, List, Optional from datetime import datetime from dotenv import load_dotenv # 加载环境变量 load_dotenv() try: from langchain_openai import ChatOpenAI from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import JsonOutputParser from pydantic import BaseModel, Field, ValidationError except ImportError as e: print(f"❌ 缺少依赖包: {e}") print("请运行: pip install langchain langchain-openai pydantic python-dotenv") sys.exit(1) class AnalysisMetrics(BaseModel): """分析指标数据模型""" total_records: int = Field(description="总记录数") valid_records: int = Field(description="有效记录数") invalid_records: int = Field(description="无效记录数") completeness_rate: float = Field(description="完整性比率", ge=0, le=1) unique_values: int = Field(description="唯一值数量") class DataQualityReport(BaseModel): """数据质量报告""" dataset_name: str = Field(description="数据集名称") analysis_date: str = Field(description="分析日期") overall_score: float = Field(description="整体质量评分", ge=0, le=1) metrics: AnalysisMetrics = Field(description="详细指标") recommendations: List[str] = Field(description="改进建议") warnings: List[str] = Field(description="警告信息") class AdvancedAgent: """高级Agent - 支持结构化输出和错误处理""" def __init__(self, max_retries: int = 3): """初始化高级Agent""" api_key = os.getenv('DEEPSEEK_API_KEY') if not api_key: raise ValueError("请在.env文件中设置DEEPSEEK_API_KEY") # 初始化LLM self.llm = ChatOpenAI( model="deepseek-chat", api_key=api_key, base_url="https://api.deepseek.com", temperature=0.1 ) self.max_retries = max_retries self.call_history = [] print("✅ AdvancedAgent初始化完成") def create_quality_analysis_prompt(self) -> ChatPromptTemplate: """创建数据质量分析提示词""" parser = JsonOutputParser(pydantic_object=DataQualityReport) template = """你是一个专业的数据质量分析师,请分析提供的数据集并生成详细的质量报告。 数据集信息: 名称: {dataset_name} 记录数量: {record_count} 数据样例: {data_sample} 请按以下JSON格式输出分析报告: {format_instructions} 要求: 1. 计算总记录数、有效记录数、无效记录数 2. 评估数据完整性(0-1之间的分数) 3. 识别唯一值数量 4. 给出整体质量评分(0-1之间) 5. 提供至少2条改进建议 6. 如果发现问题,请在warnings中列出 确保所有数值字段都是数字类型,字符串字段是字符串类型。""" return ChatPromptTemplate.from_template( template, partial_variables={"format_instructions": parser.get_format_instructions()} ) def analyze_data_quality(self, dataset_name: str, data: List[Dict[str, Any]]) -> Dict[str, Any]: """ 分析数据质量 Args: dataset_name: 数据集名称 data: 数据列表 Returns: 分析结果 """ start_time = datetime.now() try: # 准备数据样例 data_sample = json.dumps(data[:3], ensure_ascii=False, indent=2) if data else "无数据" # 创建提示词 prompt = self.create_quality_analysis_prompt() chain = prompt | self.llm | JsonOutputParser(pydantic_object=DataQualityReport) # 执行分析(带重试机制) result = None last_error = None for attempt in range(self.max_retries): try: print(f"🔍 执行数据质量分析 (尝试 {attempt + 1}/{self.max_retries})") raw_result = chain.invoke({ "dataset_name": dataset_name, "record_count": len(data), "data_sample": data_sample }) # 验证和转换结果 result = DataQualityReport(**raw_result) break except (ValidationError, json.JSONDecodeError) as e: last_error = f"解析错误: {str(e)}" print(f"⚠️ 尝试 {attempt + 1} 失败: {last_error}") if attempt < self.max_retries - 1: continue except Exception as e: last_error = f"执行错误: {str(e)}" print(f"❌ 尝试 {attempt + 1} 失败: {last_error}") if attempt < self.max_retries - 1: continue # 记录调用历史 end_time = datetime.now() call_record = { "timestamp": end_time.isoformat(), "duration": (end_time - start_time).total_seconds(), "function": "analyze_data_quality", "dataset": dataset_name, "success": result is not None, "attempts": attempt + 1 if 'attempt' in locals() else 1, "error": last_error if result is None else None } self.call_history.append(call_record) if result: print("✅ 数据质量分析完成") return { "success": True, "result": result.dict(), "call_info": call_record } else: print(f"❌ 数据质量分析失败: {last_error}") return { "success": False, "error": last_error, "call_info": call_record } except Exception as e: end_time = datetime.now() error_msg = f"意外错误: {str(e)}" call_record = { "timestamp": end_time.isoformat(), "duration": (end_time - start_time).total_seconds(), "function": "analyze_data_quality", "dataset": dataset_name, "success": False, "attempts": 1, "error": error_msg } self.call_history.append(call_record) print(f"❌ 数据质量分析异常: {error_msg}") return { "success": False, "error": error_msg, "call_info": call_record } def generate_summary_report(self) -> Dict[str, Any]: """生成调用历史摘要报告""" if not self.call_history: return {"message": "暂无调用历史"} total_calls = len(self.call_history) successful_calls = sum(1 for call in self.call_history if call["success"]) failed_calls = total_calls - successful_calls total_duration = sum(call["duration"] for call in self.call_history) avg_duration = total_duration / total_calls if total_calls > 0 else 0 return { "total_calls": total_calls, "successful_calls": successful_calls, "failed_calls": failed_calls, "success_rate": successful_calls / total_calls if total_calls > 0 else 0, "total_duration": round(total_duration, 2), "average_duration": round(avg_duration, 2), "call_history": self.call_history[-5:] # 最近5次调用 } def create_sample_data() -> List[Dict[str, Any]]: """创建示例数据""" return [ { "id": 1, "name": "张三", "age": 25, "city": "北京", "salary": 5000, "department": "技术部" }, { "id": 2, "name": "李四", "age": 30, "city": "上海", "salary": 6000, "department": "销售部" }, { "id": 3, "name": "王五", "age": None, # 缺失数据 "city": "广州", "salary": None, # 缺失数据 "department": "技术部" }, { "id": 4, "name": "赵六", "age": 35, "city": "深圳", "salary": 7000, "department": "财务部" }, { "id": 5, "name": "张三", # 重复数据 "age": 25, "city": "北京", "salary": 5000, "department": "技术部" } ] def main(): """主函数 - 演示高级Agent功能""" print("🚀 高级Agent示例 - 结构化输出与错误处理") print("=" * 60) try: # 创建Agent实例 agent = AdvancedAgent(max_retries=2) # 准备测试数据 sample_data = create_sample_data() print(f"\n🧪 测试数据:") print(f"数据集: 示例员工数据") print(f"记录数: {len(sample_data)}") print(f"数据样例: {json.dumps(sample_data[0], ensure_ascii=False, indent=2)}") # 执行数据质量分析 print("\n🔍 开始数据质量分析...") result = agent.analyze_data_quality("员工数据集", sample_data) if result["success"]: analysis_result = result["result"] print("\n✅ 分析结果:") print(f"整体质量评分: {analysis_result['overall_score']:.2f}") print(f"完整性比率: {analysis_result['metrics']['completeness_rate']:.2f}") print(f"唯一值数量: {analysis_result['metrics']['unique_values']}") print(f"\n📋 改进建议:") for i, rec in enumerate(analysis_result['recommendations'][:3], 1): print(f"{i}. {rec}") if analysis_result['warnings']: print(f"\n⚠️ 警告信息:") for warning in analysis_result['warnings'][:2]: print(f"• {warning}") else: print(f"❌ 分析失败: {result['error']}") # 显示调用历史摘要 print("\n📊 调用历史摘要:") summary = agent.generate_summary_report() print(f"总调用次数: {summary['total_calls']}") print(f"成功率: {summary['success_rate']:.1%}") print(f"平均耗时: {summary['average_duration']:.2f}秒") print("\n🎉 高级Agent示例完成!") print("\n💡 学习要点:") print("1. Pydantic数据模型: 使用BaseModel定义结构化数据") print("2. 输出解析器: JsonOutputParser自动解析JSON输出") print("3. 错误处理: 捕获ValidationError和网络异常") print("4. 重试机制: 自动重试失败的请求") print("5. 调用跟踪: 记录所有API调用的历史") print("6. 结果验证: 使用Pydantic验证输出格式") print("\n📚 下一步学习:") print("- 查看项目中的实际Agent代码") print("- 学习PRACTICE_GUIDE.md中的Phase 4内容") print("- 尝试修改示例代码,添加新功能") except Exception as e: print(f"❌ 运行出错: {e}") print("\n🔧 故障排除:") print("1. 检查.env文件中的API密钥") print("2. 确认网络连接正常") print("3. 检查pydantic版本: pip show pydantic") if __name__ == "__main__": main()