通过这个项目,掌握以下核心技能:
交易流水分析系统
├── 规划Agent (PlanningAgent)
│ ├── 分析用户需求
│ ├── 决策下一步行动
│ └── 分配计算任务
├── 大纲生成Agent (OutlineAgent)
│ ├── 生成分析报告结构
│ ├── 选择相关指标
│ └── 输出JSON格式大纲
├── 指标计算Agent (MetricCalculationAgent)
│ ├── 调用外部API
│ ├── 处理计算结果
│ └── 错误处理与重试
└── 工作流编排器 (CompleteAgentFlow)
├── 状态管理
├── 条件路由
└── 执行控制
目标: 掌握Python项目环境搭建
任务:
代码示例:
# 创建虚拟环境
python -m venv tx_flow_env
source tx_flow_env/bin/activate # Linux/Mac
# 或 tx_flow_env\Scripts\activate # Windows
# 安装依赖
pip install langchain langgraph pydantic python-dotenv
# 设置环境变量
echo "DEEPSEEK_API_KEY=your-key-here" > .env
验证代码:
import os
from dotenv import load_dotenv
load_dotenv()
api_key = os.getenv('DEEPSEEK_API_KEY')
print(f"API Key loaded: {api_key is not None}")
目标: 理解LangChain核心组件
任务:
代码示例:
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
# 初始化模型
llm = ChatOpenAI(
model="deepseek-chat",
api_key="your-api-key",
base_url="https://api.deepseek.com",
temperature=0.1
)
# 创建提示词模板
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个专业的助手,请用JSON格式回答。"),
("user", "请分析这个数据: {data}")
])
# 链式调用
chain = prompt | llm
result = chain.invoke({"data": "示例数据"})
print(result.content)
目标: 掌握Agent基本结构
任务:
代码示例:
from typing import List, Dict, Any
from datetime import datetime
from langchain_openai import ChatOpenAI
class BaseAgent:
"""基础Agent类"""
def __init__(self, api_key: str, base_url: str = "https://api.deepseek.com"):
self.llm = ChatOpenAI(
model="deepseek-chat",
api_key=api_key,
base_url=base_url,
temperature=0.1
)
# API调用跟踪
self.api_calls: List[Dict[str, Any]] = []
def call_llm(self, messages: List[Dict[str, str]]) -> str:
"""调用大模型并记录"""
start_time = datetime.now()
try:
response = self.llm.invoke(messages)
end_time = datetime.now()
# 记录API调用
self.api_calls.append({
"timestamp": end_time.isoformat(),
"duration": (end_time - start_time).total_seconds(),
"success": True,
"response": response.content
})
return response.content
except Exception as e:
end_time = datetime.now()
self.api_calls.append({
"timestamp": end_time.isoformat(),
"duration": (end_time - start_time).total_seconds(),
"success": False,
"error": str(e)
})
raise
目标: 掌握提示词设计技巧
任务:
代码示例:
from pydantic import BaseModel, Field
from langchain_core.output_parsers import JsonOutputParser
class AnalysisResult(BaseModel):
"""分析结果结构"""
summary: str = Field(description="分析摘要")
key_points: List[str] = Field(description="关键要点")
confidence: float = Field(description="置信度", ge=0, le=1)
def create_analysis_prompt() -> ChatPromptTemplate:
"""创建分析提示词"""
parser = JsonOutputParser(pydantic_object=AnalysisResult)
template = """你是一个专业的数据分析师,请分析以下数据并按JSON格式输出。
数据: {data}
要求:
1. 提供简洁的分析摘要
2. 列出3-5个关键要点
3. 给出分析置信度(0-1之间)
{format_instructions}
请确保输出有效的JSON格式。"""
return ChatPromptTemplate.from_template(
template,
partial_variables={"format_instructions": parser.get_format_instructions()}
)
目标: 理解状态机概念
任务:
代码示例:
from typing import TypedDict, Annotated, List
from langgraph.graph import StateGraph, START, END
class WorkflowState(TypedDict):
"""工作流状态"""
question: str
current_step: str
results: Annotated[List[str], "add"] # 使用注解支持列表追加
def planning_node(state: WorkflowState) -> WorkflowState:
"""规划节点"""
print(f"📋 规划阶段 - 问题: {state['question']}")
# 简单的决策逻辑
if "分析" in state["question"]:
next_step = "analyze"
else:
next_step = "answer"
return {
**state,
"current_step": next_step,
"results": ["规划完成"]
}
def analysis_node(state: WorkflowState) -> WorkflowState:
"""分析节点"""
print("🔍 分析阶段")
return {
**state,
"results": ["分析完成"]
}
def answer_node(state: WorkflowState) -> WorkflowState:
"""回答节点"""
print("💬 回答阶段")
return {
**state,
"results": ["回答完成"]
}
# 创建工作流
workflow = StateGraph(WorkflowState)
# 添加节点
workflow.add_node("planning", planning_node)
workflow.add_node("analysis", analysis_node)
workflow.add_node("answer", answer_node)
# 设置入口
workflow.set_entry_point("planning")
# 添加边
workflow.add_edge("analysis", END)
workflow.add_edge("answer", END)
# 添加条件边
def route_from_planning(state: WorkflowState) -> str:
return state["current_step"]
workflow.add_conditional_edges(
"planning",
route_from_planning,
{
"analyze": "analysis",
"answer": "answer"
}
)
# 编译并运行
app = workflow.compile()
result = app.invoke({
"question": "请分析这个数据",
"current_step": "",
"results": []
})
print("最终结果:", result)
目标: 掌握复杂工作流设计
任务:
目标: 实现Agent间数据传递
任务:
目标: 构建健壮的Agent系统
任务:
目标: 实现数据加载和预处理
基于项目中的 data_manager.py,实现:
目标: 实现决策制定逻辑
基于项目中的 planning_agent.py,实现:
目标: 实现结构化内容生成
基于项目中的 outline_agent.py,实现:
目标: 实现外部API调用
基于项目中的 rules_engine_metric_calculation_agent.py,实现:
目标: 集成所有组件
基于项目中的 complete_agent_flow_rule.py,实现:
目标: 添加可观测性
基于项目中的日志系统,实现:
# 1. 克隆项目
git clone <repository-url>
cd tx_flow_analysis
# 2. 创建环境
python -m venv venv
source venv/bin/activate
# 3. 安装依赖
pip install -r requirements.txt
# 4. 配置环境变量
cp env_example.txt .env
# 编辑 .env 文件
# 5. 开始练习!
bash
python examples/basic_agent.py
python examples/state_machine.py
# 6. 练习项目结构
examples/ ├── basic_agent.py # 基础Agent示例(LLM调用、提示词设计) ├── state_machine.py # LangGraph状态机示例(工作流设计、条件路由) ├── advanced_agent.py # 高级Agent功能(结构化输出、错误处理、重试机制) └── multi_agent.py # 多Agent协作系统(任务分配、结果传递)
祝你在LangChain和LangGraph的学习之路上取得成功!🚀