| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167 |
- #!/usr/bin/env python3
- """
- 测试 Kafka 解耦模式:parse-service → Kafka → schedule-embedding-api
- """
- import requests
- import json
- import time
- import os
- import sys
- # 配置
- PARSE_SERVICE_URL = "http://localhost:8000"
- EMBEDDING_API_URL = "http://localhost:8084"
- # 测试文件
- TEST_FILE = "/Users/chaizi/Work/Projects/yusys/ai-study/code/four-level-schedule/duomotai/examples/5.docx"
- def check_services():
- """检查服务状态"""
- print("=" * 60)
- print("检查服务状态...")
- print("=" * 60)
- # 检查 parse-service
- try:
- response = requests.get(f"{PARSE_SERVICE_URL}/health", timeout=5)
- if response.status_code == 200:
- data = response.json()
- kafka_enabled = data.get("kafka_enabled", False)
- print(f"✅ parse-service 正常 (Kafka: {kafka_enabled})")
- if not kafka_enabled:
- print(" ⚠️ Kafka 未启用,请用 KAFKA_ENABLED=true 重启 parse-service")
- else:
- print(f"❌ parse-service 异常: {response.status_code}")
- return False
- except Exception as e:
- print(f"❌ parse-service 不可达: {e}")
- return False
- # 检查 schedule-embedding-api
- try:
- response = requests.get(f"{EMBEDDING_API_URL}/actuator/health", timeout=5)
- if response.status_code == 200:
- data = response.json()
- if data.get("status") == "UP":
- print("✅ schedule-embedding-api 正常")
- else:
- print(f"❌ schedule-embedding-api 状态异常: {data}")
- return False
- else:
- print(f"❌ schedule-embedding-api 异常: {response.status_code}")
- return False
- except Exception as e:
- print(f"❌ schedule-embedding-api 不可达: {e}")
- return False
- print()
- return True
- def parse_and_send_to_kafka(file_path, task_id):
- """调用 parse-service 解析并发送到 Kafka"""
- print(f" 步骤 1/2: 调用 parse-service 解析并发送到 Kafka...")
- print(f" 文件: {file_path}")
- print(f" 任务ID: {task_id}")
- url = f"{PARSE_SERVICE_URL}/api/v1/parse/path?file_path={file_path}&send_to_kafka_flag=true"
- try:
- response = requests.post(url, json={}, timeout=300)
- if response.status_code != 200:
- print(f" ❌ HTTP错误: {response.status_code}")
- print(f" {response.text}")
- return None
- data = response.json()
- if data.get("code") != 200:
- print(f" ❌ 解析失败: {data.get('message')}")
- return None
- result = data.get("data")
- sent_to_kafka = result.get("sent_to_kafka", False)
- print(f" ✅ 解析成功!")
- print(f" - 文件类型: {result.get('file_type')}")
- print(f" - 内容长度: {len(result.get('content', ''))} 字符")
- print(f" - 解析耗时: {result.get('parse_time_ms')} ms")
- print(f" - 已发送到 Kafka: {sent_to_kafka}")
- if not sent_to_kafka:
- print(" ❌ 消息未发送到 Kafka!")
- return None
- return result
- except Exception as e:
- print(f" ❌ 调用异常: {e}")
- return None
- def main():
- print("\n")
- print("╔" + "=" * 58 + "╗")
- print("║" + " " * 15 + "Kafka 解耦模式测试" + " " * 21 + "║")
- print("╚" + "=" * 58 + "╝")
- print()
- print("流程: parse-service → Kafka → schedule-embedding-api → Elasticsearch")
- print()
- # 检查服务
- if not check_services():
- return 1
- # 检查文件是否存在
- if not os.path.exists(TEST_FILE):
- print(f"❌ 文件不存在: {TEST_FILE}")
- return 1
- # 生成任务ID
- task_id = f"test-kafka-{int(time.time())}"
- print("=" * 60)
- print(f"开始处理文件...")
- print("=" * 60)
- print()
- print("-" * 60)
- print(f"文件: {os.path.basename(TEST_FILE)}")
- print("-" * 60)
- # 1. 解析文件并发送到 Kafka
- parse_result = parse_and_send_to_kafka(TEST_FILE, task_id)
- if not parse_result:
- return 1
- # 2. 等待 schedule-embedding-api 消费
- print()
- print(f" 步骤 2/2: 等待 schedule-embedding-api 消费 Kafka 消息...")
- print(" (请查看 schedule-embedding-api 日志确认消费情况)")
- print()
- print(" 💡 提示:")
- print(" - 消息已发送到 Kafka topic: embedding-topic")
- print(" - schedule-embedding-api 应该会自动消费并处理")
- print(" - 可以查看 MySQL kafka_processing_log 表确认处理状态")
- print()
- # 稍等片刻
- print(" 等待 5 秒让消费者处理...")
- time.sleep(5)
- print()
- print("=" * 60)
- print("测试完成!")
- print("=" * 60)
- print()
- print("✅ parse-service 已成功解析文件并发送到 Kafka")
- print()
- print("接下来请检查:")
- print(" 1. schedule-embedding-api 日志 - 查看消费情况")
- print(" 2. MySQL kafka_processing_log 表 - 查看处理记录")
- print(" 3. Elasticsearch - 验证文档是否已索引")
- print()
- return 0
- if __name__ == "__main__":
- sys.exit(main())
|