test_kafka_mode.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. #!/usr/bin/env python3
  2. """
  3. 测试 Kafka 解耦模式:parse-service → Kafka → schedule-embedding-api
  4. """
  5. import requests
  6. import json
  7. import time
  8. import os
  9. import sys
  10. # 配置
  11. PARSE_SERVICE_URL = "http://localhost:8000"
  12. EMBEDDING_API_URL = "http://localhost:8084"
  13. # 测试文件
  14. TEST_FILE = "/Users/chaizi/Work/Projects/yusys/ai-study/code/four-level-schedule/duomotai/examples/5.docx"
  15. def check_services():
  16. """检查服务状态"""
  17. print("=" * 60)
  18. print("检查服务状态...")
  19. print("=" * 60)
  20. # 检查 parse-service
  21. try:
  22. response = requests.get(f"{PARSE_SERVICE_URL}/health", timeout=5)
  23. if response.status_code == 200:
  24. data = response.json()
  25. kafka_enabled = data.get("kafka_enabled", False)
  26. print(f"✅ parse-service 正常 (Kafka: {kafka_enabled})")
  27. if not kafka_enabled:
  28. print(" ⚠️ Kafka 未启用,请用 KAFKA_ENABLED=true 重启 parse-service")
  29. else:
  30. print(f"❌ parse-service 异常: {response.status_code}")
  31. return False
  32. except Exception as e:
  33. print(f"❌ parse-service 不可达: {e}")
  34. return False
  35. # 检查 schedule-embedding-api
  36. try:
  37. response = requests.get(f"{EMBEDDING_API_URL}/actuator/health", timeout=5)
  38. if response.status_code == 200:
  39. data = response.json()
  40. if data.get("status") == "UP":
  41. print("✅ schedule-embedding-api 正常")
  42. else:
  43. print(f"❌ schedule-embedding-api 状态异常: {data}")
  44. return False
  45. else:
  46. print(f"❌ schedule-embedding-api 异常: {response.status_code}")
  47. return False
  48. except Exception as e:
  49. print(f"❌ schedule-embedding-api 不可达: {e}")
  50. return False
  51. print()
  52. return True
  53. def parse_and_send_to_kafka(file_path, task_id):
  54. """调用 parse-service 解析并发送到 Kafka"""
  55. print(f" 步骤 1/2: 调用 parse-service 解析并发送到 Kafka...")
  56. print(f" 文件: {file_path}")
  57. print(f" 任务ID: {task_id}")
  58. url = f"{PARSE_SERVICE_URL}/api/v1/parse/path?file_path={file_path}&send_to_kafka_flag=true"
  59. try:
  60. response = requests.post(url, json={}, timeout=300)
  61. if response.status_code != 200:
  62. print(f" ❌ HTTP错误: {response.status_code}")
  63. print(f" {response.text}")
  64. return None
  65. data = response.json()
  66. if data.get("code") != 200:
  67. print(f" ❌ 解析失败: {data.get('message')}")
  68. return None
  69. result = data.get("data")
  70. sent_to_kafka = result.get("sent_to_kafka", False)
  71. print(f" ✅ 解析成功!")
  72. print(f" - 文件类型: {result.get('file_type')}")
  73. print(f" - 内容长度: {len(result.get('content', ''))} 字符")
  74. print(f" - 解析耗时: {result.get('parse_time_ms')} ms")
  75. print(f" - 已发送到 Kafka: {sent_to_kafka}")
  76. if not sent_to_kafka:
  77. print(" ❌ 消息未发送到 Kafka!")
  78. return None
  79. return result
  80. except Exception as e:
  81. print(f" ❌ 调用异常: {e}")
  82. return None
  83. def main():
  84. print("\n")
  85. print("╔" + "=" * 58 + "╗")
  86. print("║" + " " * 15 + "Kafka 解耦模式测试" + " " * 21 + "║")
  87. print("╚" + "=" * 58 + "╝")
  88. print()
  89. print("流程: parse-service → Kafka → schedule-embedding-api → Elasticsearch")
  90. print()
  91. # 检查服务
  92. if not check_services():
  93. return 1
  94. # 检查文件是否存在
  95. if not os.path.exists(TEST_FILE):
  96. print(f"❌ 文件不存在: {TEST_FILE}")
  97. return 1
  98. # 生成任务ID
  99. task_id = f"test-kafka-{int(time.time())}"
  100. print("=" * 60)
  101. print(f"开始处理文件...")
  102. print("=" * 60)
  103. print()
  104. print("-" * 60)
  105. print(f"文件: {os.path.basename(TEST_FILE)}")
  106. print("-" * 60)
  107. # 1. 解析文件并发送到 Kafka
  108. parse_result = parse_and_send_to_kafka(TEST_FILE, task_id)
  109. if not parse_result:
  110. return 1
  111. # 2. 等待 schedule-embedding-api 消费
  112. print()
  113. print(f" 步骤 2/2: 等待 schedule-embedding-api 消费 Kafka 消息...")
  114. print(" (请查看 schedule-embedding-api 日志确认消费情况)")
  115. print()
  116. print(" 💡 提示:")
  117. print(" - 消息已发送到 Kafka topic: embedding-topic")
  118. print(" - schedule-embedding-api 应该会自动消费并处理")
  119. print(" - 可以查看 MySQL kafka_processing_log 表确认处理状态")
  120. print()
  121. # 稍等片刻
  122. print(" 等待 5 秒让消费者处理...")
  123. time.sleep(5)
  124. print()
  125. print("=" * 60)
  126. print("测试完成!")
  127. print("=" * 60)
  128. print()
  129. print("✅ parse-service 已成功解析文件并发送到 Kafka")
  130. print()
  131. print("接下来请检查:")
  132. print(" 1. schedule-embedding-api 日志 - 查看消费情况")
  133. print(" 2. MySQL kafka_processing_log 表 - 查看处理记录")
  134. print(" 3. Elasticsearch - 验证文档是否已索引")
  135. print()
  136. return 0
  137. if __name__ == "__main__":
  138. sys.exit(main())