#!/usr/bin/env python3 """ 并发测试脚本 - 测试 parse-service 的并发处理能力 """ import requests import json import time import threading import uuid from concurrent.futures import ThreadPoolExecutor, as_completed # 配置 PARSE_SERVICE_URL = "http://localhost:8000" EMBEDDING_API_URL = "http://localhost:8084" # 测试文件列表(可以重复使用来模拟并发) TEST_FILES = [ "/Users/chaizi/Work/Projects/yusys/ai-study/code/four-level-schedule/duomotai/examples/5.docx", "/Users/chaizi/Work/Projects/yusys/ai-study/code/four-level-schedule/duomotai/examples/20以内口算.pdf", ] def check_services(): """检查服务状态""" print("=" * 70) print("检查服务状态...") print("=" * 70) # 检查 parse-service try: response = requests.get(f"{PARSE_SERVICE_URL}/health", timeout=5) if response.status_code == 200: print("✅ parse-service 正常") else: print(f"❌ parse-service 异常: {response.status_code}") return False except Exception as e: print(f"❌ parse-service 不可达: {e}") print(" 请先启动: cd parse-service && python main.py") return False # 检查 schedule-embedding-api try: response = requests.get(f"{EMBEDDING_API_URL}/actuator/health", timeout=5) if response.status_code == 200: data = response.json() if data.get("status") == "UP": print("✅ schedule-embedding-api 正常") else: print(f"❌ schedule-embedding-api 状态异常: {data}") return False else: print(f"❌ schedule-embedding-api 异常: {response.status_code}") return False except Exception as e: print(f"❌ schedule-embedding-api 不可达: {e}") return False print() return True def parse_single_file(file_path, task_id): """解析单个文件(单线程任务)""" start_time = time.time() try: # 步骤 1: 调用 parse-service 解析 url = f"{PARSE_SERVICE_URL}/api/v1/parse/path?file_path={file_path}" response = requests.post(url, json={}, timeout=300) if response.status_code != 200: return { "task_id": task_id, "file_path": file_path, "success": False, "error": f"HTTP error: {response.status_code}", "parse_time_ms": 0, "index_time_ms": 0, "total_time_ms": 0 } data = response.json() if data.get("code") != 200: return { "task_id": task_id, "file_path": file_path, "success": False, "error": data.get("message", "Unknown error"), "parse_time_ms": 0, "index_time_ms": 0, "total_time_ms": 0 } parse_data = data.get("data", {}) parse_time_ms = parse_data.get("parse_time_ms", 0) parse_time = time.time() - start_time # 步骤 2: 调用 embedding-api 向量化 index_start_time = time.time() index_url = f"{EMBEDDING_API_URL}/api/v1/documents/index" file_name = file_path.split("/")[-1] if "/" in file_path else file_path index_request = { "docId": task_id, "fileName": file_name, "fullText": parse_data.get("content", ""), "filePath": file_path, "fileType": parse_data.get("file_type"), "metadata": parse_data.get("metadata", {}) } # 添加 fileSize metadata = parse_data.get("metadata", {}) if metadata and "file_size" in metadata: index_request["fileSize"] = metadata["file_size"] index_response = requests.post( index_url, json=index_request, headers={"Content-Type": "application/json"}, timeout=120 ) index_time = time.time() - index_start_time total_time = time.time() - start_time if index_response.status_code != 200: return { "task_id": task_id, "file_path": file_path, "success": False, "error": f"Index HTTP error: {index_response.status_code}", "parse_time_ms": parse_time_ms, "index_time_ms": int(index_time * 1000), "total_time_ms": int(total_time * 1000) } index_data = index_response.json() index_success = index_data.get("success", False) return { "task_id": task_id, "file_path": file_path, "success": index_success, "error": None if index_success else index_data.get("message", "Index failed"), "parse_time_ms": parse_time_ms, "index_time_ms": int(index_time * 1000), "total_time_ms": int(total_time * 1000) } except Exception as e: total_time = time.time() - start_time return { "task_id": task_id, "file_path": file_path, "success": False, "error": str(e), "parse_time_ms": 0, "index_time_ms": 0, "total_time_ms": int(total_time * 1000) } def run_concurrent_test(num_tasks=10, num_workers=5): """运行并发测试""" print("=" * 70) print(f"并发测试: {num_tasks} 个任务, {num_workers} 个工作线程") print("=" * 70) # 准备任务列表(循环使用测试文件) tasks = [] for i in range(num_tasks): file_path = TEST_FILES[i % len(TEST_FILES)] task_id = f"concurrent-test-{uuid.uuid4().hex[:8]}" tasks.append((file_path, task_id)) print(f"\n准备执行 {len(tasks)} 个任务...") for i, (file_path, task_id) in enumerate(tasks[:5]): # 只显示前5个 file_name = file_path.split("/")[-1] print(f" [{task_id}] {file_name}") if len(tasks) > 5: print(f" ... 还有 {len(tasks) - 5} 个任务") print("\n开始并发执行...") print("-" * 70) start_time = time.time() results = [] # 使用线程池执行 with ThreadPoolExecutor(max_workers=num_workers) as executor: # 提交所有任务 future_to_task = { executor.submit(parse_single_file, file_path, task_id): (file_path, task_id) for file_path, task_id in tasks } # 收集结果 completed = 0 for future in as_completed(future_to_task): file_path, task_id = future_to_task[future] try: result = future.result() results.append(result) completed += 1 # 显示进度 file_name = file_path.split("/")[-1] status = "✅" if result["success"] else "❌" print(f"[{completed}/{num_tasks}] {status} {task_id} - {file_name} " f"(总耗时: {result['total_time_ms']}ms)") except Exception as e: print(f"❌ 任务异常: {task_id} - {e}") results.append({ "task_id": task_id, "file_path": file_path, "success": False, "error": str(e), "parse_time_ms": 0, "index_time_ms": 0, "total_time_ms": 0 }) total_time = time.time() - start_time # 统计结果 success_count = sum(1 for r in results if r["success"]) fail_count = len(results) - success_count # 计算平均耗时(仅统计成功的任务) successful_results = [r for r in results if r["success"]] if successful_results: avg_parse_time = sum(r["parse_time_ms"] for r in successful_results) / len(successful_results) avg_index_time = sum(r["index_time_ms"] for r in successful_results) / len(successful_results) avg_total_time = sum(r["total_time_ms"] for r in successful_results) / len(successful_results) else: avg_parse_time = avg_index_time = avg_total_time = 0 # 打印总结 print("\n" + "=" * 70) print("并发测试总结") print("=" * 70) print(f"总任务数: {len(results)}") print(f"并发数: {num_workers}") print(f"成功: {success_count}") print(f"失败: {fail_count}") print(f"总耗时: {total_time:.2f} 秒") print(f"吞吐量: {len(results)/total_time:.2f} 任务/秒") print() if successful_results: print("平均耗时(仅成功任务):") print(f" - 解析: {avg_parse_time:.0f} ms") print(f" - 向量化: {avg_index_time:.0f} ms") print(f" - 总计: {avg_total_time:.0f} ms") if fail_count > 0: print("\n失败任务详情:") for r in results: if not r["success"]: file_name = r["file_path"].split("/")[-1] print(f" - [{r['task_id']}] {file_name}: {r['error']}") print("=" * 70) return results def main(): print("\n") print("╔" + "=" * 68 + "╗") print("║" + " " * 18 + "多模态解析 - 并发测试" + " " * 24 + "║") print("╚" + "=" * 68 + "╝") print() # 检查服务 if not check_services(): return 1 # 测试配置 print("请选择测试模式:") print(" 1. 小规模测试 (5 任务, 2 并发)") print(" 2. 中规模测试 (10 任务, 5 并发)") print(" 3. 大规模测试 (20 任务, 10 并发)") print(" 4. 自定义...") try: choice = input("\n请输入选项 (1-4, 默认 2): ").strip() if choice == "1": num_tasks, num_workers = 5, 2 elif choice == "3": num_tasks, num_workers = 20, 10 elif choice == "4": num_tasks = int(input("请输入任务数量: ")) num_workers = int(input("请输入并发数: ")) else: num_tasks, num_workers = 10, 5 except KeyboardInterrupt: print("\n\n用户中断") return 1 except Exception as e: print(f"\n输入错误: {e},使用默认配置 (10 任务, 5 并发)") num_tasks, num_workers = 10, 5 # 运行测试 try: results = run_concurrent_test(num_tasks, num_workers) return 0 if all(r["success"] for r in results) else 1 except KeyboardInterrupt: print("\n\n用户中断") return 1 if __name__ == "__main__": exit(main())