#!/usr/bin/env python3 """ 测试 Kafka 解耦模式:parse-service → Kafka → schedule-embedding-api """ import requests import json import time import os import sys # 配置 PARSE_SERVICE_URL = "http://localhost:8000" EMBEDDING_API_URL = "http://localhost:8084" # 测试文件 TEST_FILE = "/Users/chaizi/Work/Projects/yusys/ai-study/code/four-level-schedule/duomotai/examples/5.docx" def check_services(): """检查服务状态""" print("=" * 60) print("检查服务状态...") print("=" * 60) # 检查 parse-service try: response = requests.get(f"{PARSE_SERVICE_URL}/health", timeout=5) if response.status_code == 200: data = response.json() kafka_enabled = data.get("kafka_enabled", False) print(f"✅ parse-service 正常 (Kafka: {kafka_enabled})") if not kafka_enabled: print(" ⚠️ Kafka 未启用,请用 KAFKA_ENABLED=true 重启 parse-service") else: print(f"❌ parse-service 异常: {response.status_code}") return False except Exception as e: print(f"❌ parse-service 不可达: {e}") return False # 检查 schedule-embedding-api try: response = requests.get(f"{EMBEDDING_API_URL}/actuator/health", timeout=5) if response.status_code == 200: data = response.json() if data.get("status") == "UP": print("✅ schedule-embedding-api 正常") else: print(f"❌ schedule-embedding-api 状态异常: {data}") return False else: print(f"❌ schedule-embedding-api 异常: {response.status_code}") return False except Exception as e: print(f"❌ schedule-embedding-api 不可达: {e}") return False print() return True def parse_and_send_to_kafka(file_path, task_id): """调用 parse-service 解析并发送到 Kafka""" print(f" 步骤 1/2: 调用 parse-service 解析并发送到 Kafka...") print(f" 文件: {file_path}") print(f" 任务ID: {task_id}") url = f"{PARSE_SERVICE_URL}/api/v1/parse/path?file_path={file_path}&send_to_kafka_flag=true" try: response = requests.post(url, json={}, timeout=300) if response.status_code != 200: print(f" ❌ HTTP错误: {response.status_code}") print(f" {response.text}") return None data = response.json() if data.get("code") != 200: print(f" ❌ 解析失败: {data.get('message')}") return None result = data.get("data") sent_to_kafka = result.get("sent_to_kafka", False) print(f" ✅ 解析成功!") print(f" - 文件类型: {result.get('file_type')}") print(f" - 内容长度: {len(result.get('content', ''))} 字符") print(f" - 解析耗时: {result.get('parse_time_ms')} ms") print(f" - 已发送到 Kafka: {sent_to_kafka}") if not sent_to_kafka: print(" ❌ 消息未发送到 Kafka!") return None return result except Exception as e: print(f" ❌ 调用异常: {e}") return None def main(): print("\n") print("╔" + "=" * 58 + "╗") print("║" + " " * 15 + "Kafka 解耦模式测试" + " " * 21 + "║") print("╚" + "=" * 58 + "╝") print() print("流程: parse-service → Kafka → schedule-embedding-api → Elasticsearch") print() # 检查服务 if not check_services(): return 1 # 检查文件是否存在 if not os.path.exists(TEST_FILE): print(f"❌ 文件不存在: {TEST_FILE}") return 1 # 生成任务ID task_id = f"test-kafka-{int(time.time())}" print("=" * 60) print(f"开始处理文件...") print("=" * 60) print() print("-" * 60) print(f"文件: {os.path.basename(TEST_FILE)}") print("-" * 60) # 1. 解析文件并发送到 Kafka parse_result = parse_and_send_to_kafka(TEST_FILE, task_id) if not parse_result: return 1 # 2. 等待 schedule-embedding-api 消费 print() print(f" 步骤 2/2: 等待 schedule-embedding-api 消费 Kafka 消息...") print(" (请查看 schedule-embedding-api 日志确认消费情况)") print() print(" 💡 提示:") print(" - 消息已发送到 Kafka topic: embedding-topic") print(" - schedule-embedding-api 应该会自动消费并处理") print(" - 可以查看 MySQL kafka_processing_log 表确认处理状态") print() # 稍等片刻 print(" 等待 5 秒让消费者处理...") time.sleep(5) print() print("=" * 60) print("测试完成!") print("=" * 60) print() print("✅ parse-service 已成功解析文件并发送到 Kafka") print() print("接下来请检查:") print(" 1. schedule-embedding-api 日志 - 查看消费情况") print(" 2. MySQL kafka_processing_log 表 - 查看处理记录") print(" 3. Elasticsearch - 验证文档是否已索引") print() return 0 if __name__ == "__main__": sys.exit(main())