||
- #!/usr/bin/env python3
- """
- 高级Agent示例 - 结构化输出与错误处理
- ===================================
- 这个文件展示了高级Agent功能,包含:
- 1. Pydantic数据模型
- 2. 结构化JSON输出
- 3. 错误处理与重试
- 4. 结果验证
- 5. 日志记录
- 运行方法:
- python examples/advanced_agent.py
- """
- import os
- import sys
- import json
- from typing import Dict, Any, List, Optional
- from datetime import datetime
- from dotenv import load_dotenv
- # 加载环境变量
- load_dotenv()
- try:
- from langchain_openai import ChatOpenAI
- from langchain_core.prompts import ChatPromptTemplate
- from langchain_core.output_parsers import JsonOutputParser
- from pydantic import BaseModel, Field, ValidationError
- except ImportError as e:
- print(f"❌ 缺少依赖包: {e}")
- print("请运行: pip install langchain langchain-openai pydantic python-dotenv")
- sys.exit(1)
- class AnalysisMetrics(BaseModel):
- """分析指标数据模型"""
- total_records: int = Field(description="总记录数")
- valid_records: int = Field(description="有效记录数")
- invalid_records: int = Field(description="无效记录数")
- completeness_rate: float = Field(description="完整性比率", ge=0, le=1)
- unique_values: int = Field(description="唯一值数量")
- class DataQualityReport(BaseModel):
- """数据质量报告"""
- dataset_name: str = Field(description="数据集名称")
- analysis_date: str = Field(description="分析日期")
- overall_score: float = Field(description="整体质量评分", ge=0, le=1)
- metrics: AnalysisMetrics = Field(description="详细指标")
- recommendations: List[str] = Field(description="改进建议")
- warnings: List[str] = Field(description="警告信息")
- class AdvancedAgent:
- """高级Agent - 支持结构化输出和错误处理"""
- def __init__(self, max_retries: int = 3):
- """初始化高级Agent"""
- api_key = os.getenv('DEEPSEEK_API_KEY')
- if not api_key:
- raise ValueError("请在.env文件中设置DEEPSEEK_API_KEY")
- # 初始化LLM
- self.llm = ChatOpenAI(
- model="deepseek-chat",
- api_key=api_key,
- base_url="https://api.deepseek.com",
- temperature=0.1
- )
- self.max_retries = max_retries
- self.call_history = []
- print("✅ AdvancedAgent初始化完成")
- def create_quality_analysis_prompt(self) -> ChatPromptTemplate:
- """创建数据质量分析提示词"""
- parser = JsonOutputParser(pydantic_object=DataQualityReport)
- template = """你是一个专业的数据质量分析师,请分析提供的数据集并生成详细的质量报告。
- 数据集信息:
- 名称: {dataset_name}
- 记录数量: {record_count}
- 数据样例: {data_sample}
- 请按以下JSON格式输出分析报告:
- {format_instructions}
- 要求:
- 1. 计算总记录数、有效记录数、无效记录数
- 2. 评估数据完整性(0-1之间的分数)
- 3. 识别唯一值数量
- 4. 给出整体质量评分(0-1之间)
- 5. 提供至少2条改进建议
- 6. 如果发现问题,请在warnings中列出
- 确保所有数值字段都是数字类型,字符串字段是字符串类型。"""
- return ChatPromptTemplate.from_template(
- template,
- partial_variables={"format_instructions": parser.get_format_instructions()}
- )
- def analyze_data_quality(self, dataset_name: str, data: List[Dict[str, Any]]) -> Dict[str, Any]:
- """
- 分析数据质量
- Args:
- dataset_name: 数据集名称
- data: 数据列表
- Returns:
- 分析结果
- """
- start_time = datetime.now()
- try:
- # 准备数据样例
- data_sample = json.dumps(data[:3], ensure_ascii=False, indent=2) if data else "无数据"
- # 创建提示词
- prompt = self.create_quality_analysis_prompt()
- chain = prompt | self.llm | JsonOutputParser(pydantic_object=DataQualityReport)
- # 执行分析(带重试机制)
- result = None
- last_error = None
- for attempt in range(self.max_retries):
- try:
- print(f"🔍 执行数据质量分析 (尝试 {attempt + 1}/{self.max_retries})")
- raw_result = chain.invoke({
- "dataset_name": dataset_name,
- "record_count": len(data),
- "data_sample": data_sample
- })
- # 验证和转换结果
- result = DataQualityReport(**raw_result)
- break
- except (ValidationError, json.JSONDecodeError) as e:
- last_error = f"解析错误: {str(e)}"
- print(f"⚠️ 尝试 {attempt + 1} 失败: {last_error}")
- if attempt < self.max_retries - 1:
- continue
- except Exception as e:
- last_error = f"执行错误: {str(e)}"
- print(f"❌ 尝试 {attempt + 1} 失败: {last_error}")
- if attempt < self.max_retries - 1:
- continue
- # 记录调用历史
- end_time = datetime.now()
- call_record = {
- "timestamp": end_time.isoformat(),
- "duration": (end_time - start_time).total_seconds(),
- "function": "analyze_data_quality",
- "dataset": dataset_name,
- "success": result is not None,
- "attempts": attempt + 1 if 'attempt' in locals() else 1,
- "error": last_error if result is None else None
- }
- self.call_history.append(call_record)
- if result:
- print("✅ 数据质量分析完成")
- return {
- "success": True,
- "result": result.dict(),
- "call_info": call_record
- }
- else:
- print(f"❌ 数据质量分析失败: {last_error}")
- return {
- "success": False,
- "error": last_error,
- "call_info": call_record
- }
- except Exception as e:
- end_time = datetime.now()
- error_msg = f"意外错误: {str(e)}"
- call_record = {
- "timestamp": end_time.isoformat(),
- "duration": (end_time - start_time).total_seconds(),
- "function": "analyze_data_quality",
- "dataset": dataset_name,
- "success": False,
- "attempts": 1,
- "error": error_msg
- }
- self.call_history.append(call_record)
- print(f"❌ 数据质量分析异常: {error_msg}")
- return {
- "success": False,
- "error": error_msg,
- "call_info": call_record
- }
- def generate_summary_report(self) -> Dict[str, Any]:
- """生成调用历史摘要报告"""
- if not self.call_history:
- return {"message": "暂无调用历史"}
- total_calls = len(self.call_history)
- successful_calls = sum(1 for call in self.call_history if call["success"])
- failed_calls = total_calls - successful_calls
- total_duration = sum(call["duration"] for call in self.call_history)
- avg_duration = total_duration / total_calls if total_calls > 0 else 0
- return {
- "total_calls": total_calls,
- "successful_calls": successful_calls,
- "failed_calls": failed_calls,
- "success_rate": successful_calls / total_calls if total_calls > 0 else 0,
- "total_duration": round(total_duration, 2),
- "average_duration": round(avg_duration, 2),
- "call_history": self.call_history[-5:] # 最近5次调用
- }
- def create_sample_data() -> List[Dict[str, Any]]:
- """创建示例数据"""
- return [
- {
- "id": 1,
- "name": "张三",
- "age": 25,
- "city": "北京",
- "salary": 5000,
- "department": "技术部"
- },
- {
- "id": 2,
- "name": "李四",
- "age": 30,
- "city": "上海",
- "salary": 6000,
- "department": "销售部"
- },
- {
- "id": 3,
- "name": "王五",
- "age": None, # 缺失数据
- "city": "广州",
- "salary": None, # 缺失数据
- "department": "技术部"
- },
- {
- "id": 4,
- "name": "赵六",
- "age": 35,
- "city": "深圳",
- "salary": 7000,
- "department": "财务部"
- },
- {
- "id": 5,
- "name": "张三", # 重复数据
- "age": 25,
- "city": "北京",
- "salary": 5000,
- "department": "技术部"
- }
- ]
- def main():
- """主函数 - 演示高级Agent功能"""
- print("🚀 高级Agent示例 - 结构化输出与错误处理")
- print("=" * 60)
- try:
- # 创建Agent实例
- agent = AdvancedAgent(max_retries=2)
- # 准备测试数据
- sample_data = create_sample_data()
- print(f"\n🧪 测试数据:")
- print(f"数据集: 示例员工数据")
- print(f"记录数: {len(sample_data)}")
- print(f"数据样例: {json.dumps(sample_data[0], ensure_ascii=False, indent=2)}")
- # 执行数据质量分析
- print("\n🔍 开始数据质量分析...")
- result = agent.analyze_data_quality("员工数据集", sample_data)
- if result["success"]:
- analysis_result = result["result"]
- print("\n✅ 分析结果:")
- print(f"整体质量评分: {analysis_result['overall_score']:.2f}")
- print(f"完整性比率: {analysis_result['metrics']['completeness_rate']:.2f}")
- print(f"唯一值数量: {analysis_result['metrics']['unique_values']}")
- print(f"\n📋 改进建议:")
- for i, rec in enumerate(analysis_result['recommendations'][:3], 1):
- print(f"{i}. {rec}")
- if analysis_result['warnings']:
- print(f"\n⚠️ 警告信息:")
- for warning in analysis_result['warnings'][:2]:
- print(f"• {warning}")
- else:
- print(f"❌ 分析失败: {result['error']}")
- # 显示调用历史摘要
- print("\n📊 调用历史摘要:")
- summary = agent.generate_summary_report()
- print(f"总调用次数: {summary['total_calls']}")
- print(f"成功率: {summary['success_rate']:.1%}")
- print(f"平均耗时: {summary['average_duration']:.2f}秒")
- print("\n🎉 高级Agent示例完成!")
- print("\n💡 学习要点:")
- print("1. Pydantic数据模型: 使用BaseModel定义结构化数据")
- print("2. 输出解析器: JsonOutputParser自动解析JSON输出")
- print("3. 错误处理: 捕获ValidationError和网络异常")
- print("4. 重试机制: 自动重试失败的请求")
- print("5. 调用跟踪: 记录所有API调用的历史")
- print("6. 结果验证: 使用Pydantic验证输出格式")
- print("\n📚 下一步学习:")
- print("- 查看项目中的实际Agent代码")
- print("- 学习PRACTICE_GUIDE.md中的Phase 4内容")
- print("- 尝试修改示例代码,添加新功能")
- except Exception as e:
- print(f"❌ 运行出错: {e}")
- print("\n🔧 故障排除:")
- print("1. 检查.env文件中的API密钥")
- print("2. 确认网络连接正常")
- print("3. 检查pydantic版本: pip show pydantic")
- if __name__ == "__main__":
- main()
|