test_concurrent.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320
  1. #!/usr/bin/env python3
  2. """
  3. 并发测试脚本 - 测试 parse-service 的并发处理能力
  4. """
  5. import requests
  6. import json
  7. import time
  8. import threading
  9. import uuid
  10. from concurrent.futures import ThreadPoolExecutor, as_completed
  11. # 配置
  12. PARSE_SERVICE_URL = "http://localhost:8000"
  13. EMBEDDING_API_URL = "http://localhost:8084"
  14. # 测试文件列表(可以重复使用来模拟并发)
  15. TEST_FILES = [
  16. "/Users/chaizi/Work/Projects/yusys/ai-study/code/four-level-schedule/duomotai/examples/5.docx",
  17. "/Users/chaizi/Work/Projects/yusys/ai-study/code/four-level-schedule/duomotai/examples/20以内口算.pdf",
  18. ]
  19. def check_services():
  20. """检查服务状态"""
  21. print("=" * 70)
  22. print("检查服务状态...")
  23. print("=" * 70)
  24. # 检查 parse-service
  25. try:
  26. response = requests.get(f"{PARSE_SERVICE_URL}/health", timeout=5)
  27. if response.status_code == 200:
  28. print("✅ parse-service 正常")
  29. else:
  30. print(f"❌ parse-service 异常: {response.status_code}")
  31. return False
  32. except Exception as e:
  33. print(f"❌ parse-service 不可达: {e}")
  34. print(" 请先启动: cd parse-service && python main.py")
  35. return False
  36. # 检查 schedule-embedding-api
  37. try:
  38. response = requests.get(f"{EMBEDDING_API_URL}/actuator/health", timeout=5)
  39. if response.status_code == 200:
  40. data = response.json()
  41. if data.get("status") == "UP":
  42. print("✅ schedule-embedding-api 正常")
  43. else:
  44. print(f"❌ schedule-embedding-api 状态异常: {data}")
  45. return False
  46. else:
  47. print(f"❌ schedule-embedding-api 异常: {response.status_code}")
  48. return False
  49. except Exception as e:
  50. print(f"❌ schedule-embedding-api 不可达: {e}")
  51. return False
  52. print()
  53. return True
  54. def parse_single_file(file_path, task_id):
  55. """解析单个文件(单线程任务)"""
  56. start_time = time.time()
  57. try:
  58. # 步骤 1: 调用 parse-service 解析
  59. url = f"{PARSE_SERVICE_URL}/api/v1/parse/path?file_path={file_path}"
  60. response = requests.post(url, json={}, timeout=300)
  61. if response.status_code != 200:
  62. return {
  63. "task_id": task_id,
  64. "file_path": file_path,
  65. "success": False,
  66. "error": f"HTTP error: {response.status_code}",
  67. "parse_time_ms": 0,
  68. "index_time_ms": 0,
  69. "total_time_ms": 0
  70. }
  71. data = response.json()
  72. if data.get("code") != 200:
  73. return {
  74. "task_id": task_id,
  75. "file_path": file_path,
  76. "success": False,
  77. "error": data.get("message", "Unknown error"),
  78. "parse_time_ms": 0,
  79. "index_time_ms": 0,
  80. "total_time_ms": 0
  81. }
  82. parse_data = data.get("data", {})
  83. parse_time_ms = parse_data.get("parse_time_ms", 0)
  84. parse_time = time.time() - start_time
  85. # 步骤 2: 调用 embedding-api 向量化
  86. index_start_time = time.time()
  87. index_url = f"{EMBEDDING_API_URL}/api/v1/documents/index"
  88. file_name = file_path.split("/")[-1] if "/" in file_path else file_path
  89. index_request = {
  90. "docId": task_id,
  91. "fileName": file_name,
  92. "fullText": parse_data.get("content", ""),
  93. "filePath": file_path,
  94. "fileType": parse_data.get("file_type"),
  95. "metadata": parse_data.get("metadata", {})
  96. }
  97. # 添加 fileSize
  98. metadata = parse_data.get("metadata", {})
  99. if metadata and "file_size" in metadata:
  100. index_request["fileSize"] = metadata["file_size"]
  101. index_response = requests.post(
  102. index_url,
  103. json=index_request,
  104. headers={"Content-Type": "application/json"},
  105. timeout=120
  106. )
  107. index_time = time.time() - index_start_time
  108. total_time = time.time() - start_time
  109. if index_response.status_code != 200:
  110. return {
  111. "task_id": task_id,
  112. "file_path": file_path,
  113. "success": False,
  114. "error": f"Index HTTP error: {index_response.status_code}",
  115. "parse_time_ms": parse_time_ms,
  116. "index_time_ms": int(index_time * 1000),
  117. "total_time_ms": int(total_time * 1000)
  118. }
  119. index_data = index_response.json()
  120. index_success = index_data.get("success", False)
  121. return {
  122. "task_id": task_id,
  123. "file_path": file_path,
  124. "success": index_success,
  125. "error": None if index_success else index_data.get("message", "Index failed"),
  126. "parse_time_ms": parse_time_ms,
  127. "index_time_ms": int(index_time * 1000),
  128. "total_time_ms": int(total_time * 1000)
  129. }
  130. except Exception as e:
  131. total_time = time.time() - start_time
  132. return {
  133. "task_id": task_id,
  134. "file_path": file_path,
  135. "success": False,
  136. "error": str(e),
  137. "parse_time_ms": 0,
  138. "index_time_ms": 0,
  139. "total_time_ms": int(total_time * 1000)
  140. }
  141. def run_concurrent_test(num_tasks=10, num_workers=5):
  142. """运行并发测试"""
  143. print("=" * 70)
  144. print(f"并发测试: {num_tasks} 个任务, {num_workers} 个工作线程")
  145. print("=" * 70)
  146. # 准备任务列表(循环使用测试文件)
  147. tasks = []
  148. for i in range(num_tasks):
  149. file_path = TEST_FILES[i % len(TEST_FILES)]
  150. task_id = f"concurrent-test-{uuid.uuid4().hex[:8]}"
  151. tasks.append((file_path, task_id))
  152. print(f"\n准备执行 {len(tasks)} 个任务...")
  153. for i, (file_path, task_id) in enumerate(tasks[:5]): # 只显示前5个
  154. file_name = file_path.split("/")[-1]
  155. print(f" [{task_id}] {file_name}")
  156. if len(tasks) > 5:
  157. print(f" ... 还有 {len(tasks) - 5} 个任务")
  158. print("\n开始并发执行...")
  159. print("-" * 70)
  160. start_time = time.time()
  161. results = []
  162. # 使用线程池执行
  163. with ThreadPoolExecutor(max_workers=num_workers) as executor:
  164. # 提交所有任务
  165. future_to_task = {
  166. executor.submit(parse_single_file, file_path, task_id): (file_path, task_id)
  167. for file_path, task_id in tasks
  168. }
  169. # 收集结果
  170. completed = 0
  171. for future in as_completed(future_to_task):
  172. file_path, task_id = future_to_task[future]
  173. try:
  174. result = future.result()
  175. results.append(result)
  176. completed += 1
  177. # 显示进度
  178. file_name = file_path.split("/")[-1]
  179. status = "✅" if result["success"] else "❌"
  180. print(f"[{completed}/{num_tasks}] {status} {task_id} - {file_name} "
  181. f"(总耗时: {result['total_time_ms']}ms)")
  182. except Exception as e:
  183. print(f"❌ 任务异常: {task_id} - {e}")
  184. results.append({
  185. "task_id": task_id,
  186. "file_path": file_path,
  187. "success": False,
  188. "error": str(e),
  189. "parse_time_ms": 0,
  190. "index_time_ms": 0,
  191. "total_time_ms": 0
  192. })
  193. total_time = time.time() - start_time
  194. # 统计结果
  195. success_count = sum(1 for r in results if r["success"])
  196. fail_count = len(results) - success_count
  197. # 计算平均耗时(仅统计成功的任务)
  198. successful_results = [r for r in results if r["success"]]
  199. if successful_results:
  200. avg_parse_time = sum(r["parse_time_ms"] for r in successful_results) / len(successful_results)
  201. avg_index_time = sum(r["index_time_ms"] for r in successful_results) / len(successful_results)
  202. avg_total_time = sum(r["total_time_ms"] for r in successful_results) / len(successful_results)
  203. else:
  204. avg_parse_time = avg_index_time = avg_total_time = 0
  205. # 打印总结
  206. print("\n" + "=" * 70)
  207. print("并发测试总结")
  208. print("=" * 70)
  209. print(f"总任务数: {len(results)}")
  210. print(f"并发数: {num_workers}")
  211. print(f"成功: {success_count}")
  212. print(f"失败: {fail_count}")
  213. print(f"总耗时: {total_time:.2f} 秒")
  214. print(f"吞吐量: {len(results)/total_time:.2f} 任务/秒")
  215. print()
  216. if successful_results:
  217. print("平均耗时(仅成功任务):")
  218. print(f" - 解析: {avg_parse_time:.0f} ms")
  219. print(f" - 向量化: {avg_index_time:.0f} ms")
  220. print(f" - 总计: {avg_total_time:.0f} ms")
  221. if fail_count > 0:
  222. print("\n失败任务详情:")
  223. for r in results:
  224. if not r["success"]:
  225. file_name = r["file_path"].split("/")[-1]
  226. print(f" - [{r['task_id']}] {file_name}: {r['error']}")
  227. print("=" * 70)
  228. return results
  229. def main():
  230. print("\n")
  231. print("╔" + "=" * 68 + "╗")
  232. print("║" + " " * 18 + "多模态解析 - 并发测试" + " " * 24 + "║")
  233. print("╚" + "=" * 68 + "╝")
  234. print()
  235. # 检查服务
  236. if not check_services():
  237. return 1
  238. # 测试配置
  239. print("请选择测试模式:")
  240. print(" 1. 小规模测试 (5 任务, 2 并发)")
  241. print(" 2. 中规模测试 (10 任务, 5 并发)")
  242. print(" 3. 大规模测试 (20 任务, 10 并发)")
  243. print(" 4. 自定义...")
  244. try:
  245. choice = input("\n请输入选项 (1-4, 默认 2): ").strip()
  246. if choice == "1":
  247. num_tasks, num_workers = 5, 2
  248. elif choice == "3":
  249. num_tasks, num_workers = 20, 10
  250. elif choice == "4":
  251. num_tasks = int(input("请输入任务数量: "))
  252. num_workers = int(input("请输入并发数: "))
  253. else:
  254. num_tasks, num_workers = 10, 5
  255. except KeyboardInterrupt:
  256. print("\n\n用户中断")
  257. return 1
  258. except Exception as e:
  259. print(f"\n输入错误: {e},使用默认配置 (10 任务, 5 并发)")
  260. num_tasks, num_workers = 10, 5
  261. # 运行测试
  262. try:
  263. results = run_concurrent_test(num_tasks, num_workers)
  264. return 0 if all(r["success"] for r in results) else 1
  265. except KeyboardInterrupt:
  266. print("\n\n用户中断")
  267. return 1
  268. if __name__ == "__main__":
  269. exit(main())