test_full_pipeline_final.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. #!/usr/bin/env python3
  2. """
  3. 完整流程测试:parse-service → embedding-api → Elasticsearch
  4. """
  5. import requests
  6. import json
  7. import time
  8. import sys
  9. import os
  10. # 配置
  11. PARSE_SERVICE_URL = "http://localhost:8000"
  12. EMBEDDING_API_URL = "http://localhost:8084"
  13. # 测试文件
  14. TEST_FILES = [
  15. "/Users/chaizi/Work/Projects/yusys/ai-study/code/four-level-schedule/duomotai/examples/5.docx",
  16. "/Users/chaizi/Work/Projects/yusys/ai-study/code/four-level-schedule/duomotai/examples/20以内口算.pdf",
  17. ]
  18. def check_services():
  19. """检查服务状态"""
  20. print("=" * 60)
  21. print("检查服务状态...")
  22. print("=" * 60)
  23. # 检查 parse-service
  24. try:
  25. response = requests.get(f"{PARSE_SERVICE_URL}/health", timeout=5)
  26. if response.status_code == 200:
  27. print("✅ parse-service 正常")
  28. else:
  29. print(f"❌ parse-service 异常: {response.status_code}")
  30. return False
  31. except Exception as e:
  32. print(f"❌ parse-service 不可达: {e}")
  33. print(" 请先启动: cd parse-service && python main.py")
  34. return False
  35. # 检查 schedule-embedding-api
  36. try:
  37. response = requests.get(f"{EMBEDDING_API_URL}/actuator/health", timeout=5)
  38. if response.status_code == 200:
  39. data = response.json()
  40. if data.get("status") == "UP":
  41. print("✅ schedule-embedding-api 正常")
  42. else:
  43. print(f"❌ schedule-embedding-api 状态异常: {data}")
  44. return False
  45. else:
  46. print(f"❌ schedule-embedding-api 异常: {response.status_code}")
  47. return False
  48. except Exception as e:
  49. print(f"❌ schedule-embedding-api 不可达: {e}")
  50. print(" 请先启动 schedule-embedding-api")
  51. return False
  52. print()
  53. return True
  54. def parse_file(file_path, task_id):
  55. """调用 parse-service 解析文件"""
  56. print(f" 步骤 1/2: 调用 parse-service 解析...")
  57. print(f" 文件: {file_path}")
  58. print(f" 任务ID: {task_id}")
  59. url = f"{PARSE_SERVICE_URL}/api/v1/parse/path?file_path={file_path}"
  60. try:
  61. response = requests.post(url, json={}, timeout=300)
  62. if response.status_code != 200:
  63. print(f" ❌ HTTP错误: {response.status_code}")
  64. print(f" {response.text}")
  65. return None
  66. data = response.json()
  67. if data.get("code") != 200:
  68. print(f" ❌ 解析失败: {data.get('message')}")
  69. return None
  70. result = data.get("data")
  71. print(f" ✅ 解析成功!")
  72. print(f" - 文件类型: {result.get('file_type')}")
  73. print(f" - 内容长度: {len(result.get('content', ''))} 字符")
  74. print(f" - 解析耗时: {result.get('parse_time_ms')} ms")
  75. return result
  76. except Exception as e:
  77. print(f" ❌ 调用异常: {e}")
  78. return None
  79. def index_document(task_id, file_path, parse_result):
  80. """调用 embedding-api 向量化并入库"""
  81. print(f" 步骤 2/2: 调用 embedding-api 向量化...")
  82. url = f"{EMBEDDING_API_URL}/api/v1/documents/index"
  83. # 提取文件名
  84. file_name = os.path.basename(file_path)
  85. # 构建请求体
  86. request_body = {
  87. "docId": task_id,
  88. "fileName": file_name,
  89. "fullText": parse_result.get("content", ""),
  90. "filePath": file_path,
  91. "fileType": parse_result.get("file_type"),
  92. "metadata": parse_result.get("metadata", {}),
  93. }
  94. # 添加 fileSize
  95. metadata = parse_result.get("metadata", {})
  96. if metadata and "file_size" in metadata:
  97. request_body["fileSize"] = metadata["file_size"]
  98. try:
  99. response = requests.post(
  100. url,
  101. json=request_body,
  102. headers={"Content-Type": "application/json"},
  103. timeout=120,
  104. )
  105. if response.status_code != 200:
  106. print(f" ❌ HTTP错误: {response.status_code}")
  107. print(f" {response.text}")
  108. return False
  109. data = response.json()
  110. if data.get("success"):
  111. print(f" ✅ 向量化并入库成功!")
  112. return True
  113. else:
  114. print(f" ❌ 向量化失败: {data.get('message')}")
  115. return False
  116. except Exception as e:
  117. print(f" ❌ 调用异常: {e}")
  118. return False
  119. def main():
  120. print("\n")
  121. print("╔" + "=" * 58 + "╗")
  122. print("║" + " " * 15 + "多模态解析 - 完整流程测试" + " " * 15 + "║")
  123. print("╚" + "=" * 58 + "╝")
  124. print()
  125. # 检查服务
  126. if not check_services():
  127. return 1
  128. # 处理每个文件
  129. success_count = 0
  130. fail_count = 0
  131. print("=" * 60)
  132. print(f"开始处理 {len(TEST_FILES)} 个文件...")
  133. print("=" * 60)
  134. for i, file_path in enumerate(TEST_FILES, 1):
  135. print()
  136. print("-" * 60)
  137. print(f"文件 {i}/{len(TEST_FILES)}: {os.path.basename(file_path)}")
  138. print("-" * 60)
  139. if not os.path.exists(file_path):
  140. print(f" ❌ 文件不存在: {file_path}")
  141. fail_count += 1
  142. continue
  143. # 生成任务ID
  144. task_id = f"test-{int(time.time())}-{i}"
  145. # 1. 解析文件
  146. parse_result = parse_file(file_path, task_id)
  147. if not parse_result:
  148. fail_count += 1
  149. continue
  150. # 2. 向量化并入库
  151. if index_document(task_id, file_path, parse_result):
  152. success_count += 1
  153. else:
  154. fail_count += 1
  155. # 稍微延迟一下
  156. time.sleep(1)
  157. # 总结
  158. print()
  159. print("=" * 60)
  160. print("测试总结")
  161. print("=" * 60)
  162. print(f"总计: {len(TEST_FILES)}")
  163. print(f"成功: {success_count}")
  164. print(f"失败: {fail_count}")
  165. if fail_count == 0:
  166. print()
  167. print("🎉 所有测试通过!")
  168. else:
  169. print()
  170. print("⚠️ 部分测试失败")
  171. print("=" * 60)
  172. print()
  173. return 0 if fail_count == 0 else 1
  174. if __name__ == "__main__":
  175. sys.exit(main())