#!/usr/bin/env python3 """ SSE + FastAPI 实时对话窗口 用户输入消息,后端随机生成回复 """ import asyncio import json import time import random import os from datetime import datetime from typing import List, Dict, Any from concurrent.futures import ThreadPoolExecutor from fastapi import FastAPI, Request from fastapi.responses import StreamingResponse, HTMLResponse import uvicorn import dotenv # Agent相关导入 try: from agno.agent import Agent from agno.memory.v2.db.sqlite import SqliteMemoryDb from agno.memory.v2.memory import Memory from agno.models.openai import OpenAILike from agno.storage.sqlite import SqliteStorage AGENT_AVAILABLE = True print("✅ Agent依赖已加载") except ImportError as e: print(f"⚠️ Agent依赖未安装: {e}") AGENT_AVAILABLE = False # 加载环境变量 dotenv.load_dotenv() # 全局消息队列 message_queue: List[Dict[str, Any]] = [] # 全局Agent实例和Memory global_agent = None global_memory = None global_user_id = "user_web_chat" # 线程池执行器 thread_executor = ThreadPoolExecutor(max_workers=2) # Agent工具函数 def get_current_time(): """获取当前时间""" return f"当前时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" def get_user_info(): """获取用户信息""" return "用户信息: 当前用户正在使用web实时对话窗口" async def call_agent_async(agent, message, user_id): """异步调用Agent,避免阻塞事件循环""" loop = asyncio.get_event_loop() try: print(f"🔄 开始异步调用Agent... (消息: {message[:50]})") # 在线程池中执行同步的Agent调用,设置超时 response = await asyncio.wait_for( loop.run_in_executor( thread_executor, lambda: agent.run(message, user_id=user_id) ), timeout=30.0 # 30秒超时 ) print(f"✅ Agent异步调用完成") return response except asyncio.TimeoutError: print(f"⏰ Agent调用超时 (30秒)") return None except Exception as e: print(f"❌ Agent异步调用失败: {e}") return None def create_agent(): """创建具有Memory功能的Agent实例""" global global_agent, global_memory if not AGENT_AVAILABLE: print("❌ Agent依赖不可用,将使用随机回复") return None # 检查环境变量 api_key = os.getenv("BAILIAN_API_KEY") base_url = os.getenv("BAILIAN_API_BASE_URL") if not api_key or not base_url: print("⚠️ 环境变量未设置,Agent功能将不可用,使用随机回复") return None try: print("🚀 创建具有Memory功能的Agent实例...") # 创建模型 model = OpenAILike( id="qwen3-32b", api_key=api_key, base_url=base_url, request_params={"extra_body": {"enable_thinking": False}}, ) # 数据库文件 db_file = "tmp/agent_memory.db" os.makedirs("tmp", exist_ok=True) # 初始化Memory v2 memory = Memory( model=model, # 使用相同的模型进行记忆管理 db=SqliteMemoryDb(table_name="user_memories", db_file=db_file), ) # 初始化存储 storage = SqliteStorage(table_name="agent_sessions", db_file=db_file) # 定义记忆工具函数 def remember_info(info: str): """主动记住信息的工具函数""" return f"我已经记住了这个信息: {info}" # 创建Agent with Memory功能 agent = Agent( model=model, # Store memories in a database memory=memory, # Give the Agent the ability to update memories enable_agentic_memory=True, # Run the MemoryManager after each response enable_user_memories=True, # Store the chat history in the database storage=storage, # Add the chat history to the messages add_history_to_messages=True, # Number of history runs to include num_history_runs=3, # Tools tools=[get_current_time, get_user_info, remember_info], markdown=False, # 简单文本回复 show_tool_calls=False, # 关闭工具调用显示,避免影响web显示 instructions=""" 你是一个具有记忆功能的友好AI助手,正在通过web实时对话窗口与用户交流。 🧠 **记忆功能**: - 你可以记住用户的姓名、偏好和兴趣 - 保持跨会话的对话连贯性 - 基于历史对话提供个性化建议 - 记住之前讨论过的话题 💬 **对话原则**: - 使用简洁、自然的中文回答 - 语气友好、热情 - 回答要有帮助性 - 可以调用工具获取信息 - 主动记住重要信息 - 基于记忆提供个性化回应 🎯 **个性化服务**: - 如果用户告诉你他们的姓名,主动记住 - 记住用户的偏好和兴趣 - 在后续对话中引用之前的内容 - 提供基于历史的个性化建议 请与用户进行愉快的对话!我会记住我们的每次交流。 """, ) global_agent = agent global_memory = memory print("✅ Memory Agent创建成功!") print(f"📱 模型: qwen3-32b") print(f"🧠 记忆: SQLite数据库 ({db_file})") print(f"💾 存储: 会话历史记录") print(f"👤 用户ID: {global_user_id}") # 简单测试Agent是否正常工作 try: test_response = agent.run("你好", user_id=global_user_id) print(f"🧪 Agent测试成功: {str(test_response)[:50]}...") except Exception as e: print(f"⚠️ Agent测试失败: {e}") return agent except Exception as e: print(f"❌ Agent创建失败: {e}") return None # 随机回复内容(Agent不可用时的备用) RANDOM_REPLIES = [ "这是一个有趣的观点!", "我完全同意你的看法。", "让我想想这个问题...", "你说得很有道理。", "这让我想到了另一个话题。", "非常好的问题!", "我觉得你可以试试这样做。", "这确实是个挑战。", "你的想法很有创意!", "我需要更多信息来帮助你。", "这个话题很深入呢。", "你考虑得很周全。" ] # 创建FastAPI应用 app = FastAPI(title="SSE实时对话", description="简单的实时聊天系统", version="1.0.0") # 应用启动时初始化Agent @app.on_event("startup") async def startup_event(): print("🚀 启动SSE实时对话系统...") print("📍 访问地址: http://localhost:8000") # 初始化Agent try: create_agent() if global_agent: print("✅ Memory Agent已就绪,将提供具有记忆功能的智能回复") print("🧠 记忆功能: 可记住用户信息和对话历史") print("💬 特殊命令: 在对话中输入 '记忆' 查看记忆状态") else: print("⚠️ Agent不可用,将使用随机回复") except Exception as e: print(f"❌ Agent创建过程中出错: {e}") print("⚠️ 系统将使用随机回复模式") @app.get("/") async def home(): """主页 - 对话界面""" html_content = """