| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320 |
- #!/usr/bin/env python3
- """
- 并发测试脚本 - 测试 parse-service 的并发处理能力
- """
- import requests
- import json
- import time
- import threading
- import uuid
- from concurrent.futures import ThreadPoolExecutor, as_completed
- # 配置
- PARSE_SERVICE_URL = "http://localhost:8000"
- EMBEDDING_API_URL = "http://localhost:8084"
- # 测试文件列表(可以重复使用来模拟并发)
- TEST_FILES = [
- "/Users/chaizi/Work/Projects/yusys/ai-study/code/four-level-schedule/duomotai/examples/5.docx",
- "/Users/chaizi/Work/Projects/yusys/ai-study/code/four-level-schedule/duomotai/examples/20以内口算.pdf",
- ]
- def check_services():
- """检查服务状态"""
- print("=" * 70)
- print("检查服务状态...")
- print("=" * 70)
- # 检查 parse-service
- try:
- response = requests.get(f"{PARSE_SERVICE_URL}/health", timeout=5)
- if response.status_code == 200:
- print("✅ parse-service 正常")
- else:
- print(f"❌ parse-service 异常: {response.status_code}")
- return False
- except Exception as e:
- print(f"❌ parse-service 不可达: {e}")
- print(" 请先启动: cd parse-service && python main.py")
- return False
- # 检查 schedule-embedding-api
- try:
- response = requests.get(f"{EMBEDDING_API_URL}/actuator/health", timeout=5)
- if response.status_code == 200:
- data = response.json()
- if data.get("status") == "UP":
- print("✅ schedule-embedding-api 正常")
- else:
- print(f"❌ schedule-embedding-api 状态异常: {data}")
- return False
- else:
- print(f"❌ schedule-embedding-api 异常: {response.status_code}")
- return False
- except Exception as e:
- print(f"❌ schedule-embedding-api 不可达: {e}")
- return False
- print()
- return True
- def parse_single_file(file_path, task_id):
- """解析单个文件(单线程任务)"""
- start_time = time.time()
- try:
- # 步骤 1: 调用 parse-service 解析
- url = f"{PARSE_SERVICE_URL}/api/v1/parse/path?file_path={file_path}"
- response = requests.post(url, json={}, timeout=300)
- if response.status_code != 200:
- return {
- "task_id": task_id,
- "file_path": file_path,
- "success": False,
- "error": f"HTTP error: {response.status_code}",
- "parse_time_ms": 0,
- "index_time_ms": 0,
- "total_time_ms": 0
- }
- data = response.json()
- if data.get("code") != 200:
- return {
- "task_id": task_id,
- "file_path": file_path,
- "success": False,
- "error": data.get("message", "Unknown error"),
- "parse_time_ms": 0,
- "index_time_ms": 0,
- "total_time_ms": 0
- }
- parse_data = data.get("data", {})
- parse_time_ms = parse_data.get("parse_time_ms", 0)
- parse_time = time.time() - start_time
- # 步骤 2: 调用 embedding-api 向量化
- index_start_time = time.time()
- index_url = f"{EMBEDDING_API_URL}/api/v1/documents/index"
- file_name = file_path.split("/")[-1] if "/" in file_path else file_path
- index_request = {
- "docId": task_id,
- "fileName": file_name,
- "fullText": parse_data.get("content", ""),
- "filePath": file_path,
- "fileType": parse_data.get("file_type"),
- "metadata": parse_data.get("metadata", {})
- }
- # 添加 fileSize
- metadata = parse_data.get("metadata", {})
- if metadata and "file_size" in metadata:
- index_request["fileSize"] = metadata["file_size"]
- index_response = requests.post(
- index_url,
- json=index_request,
- headers={"Content-Type": "application/json"},
- timeout=120
- )
- index_time = time.time() - index_start_time
- total_time = time.time() - start_time
- if index_response.status_code != 200:
- return {
- "task_id": task_id,
- "file_path": file_path,
- "success": False,
- "error": f"Index HTTP error: {index_response.status_code}",
- "parse_time_ms": parse_time_ms,
- "index_time_ms": int(index_time * 1000),
- "total_time_ms": int(total_time * 1000)
- }
- index_data = index_response.json()
- index_success = index_data.get("success", False)
- return {
- "task_id": task_id,
- "file_path": file_path,
- "success": index_success,
- "error": None if index_success else index_data.get("message", "Index failed"),
- "parse_time_ms": parse_time_ms,
- "index_time_ms": int(index_time * 1000),
- "total_time_ms": int(total_time * 1000)
- }
- except Exception as e:
- total_time = time.time() - start_time
- return {
- "task_id": task_id,
- "file_path": file_path,
- "success": False,
- "error": str(e),
- "parse_time_ms": 0,
- "index_time_ms": 0,
- "total_time_ms": int(total_time * 1000)
- }
- def run_concurrent_test(num_tasks=10, num_workers=5):
- """运行并发测试"""
- print("=" * 70)
- print(f"并发测试: {num_tasks} 个任务, {num_workers} 个工作线程")
- print("=" * 70)
- # 准备任务列表(循环使用测试文件)
- tasks = []
- for i in range(num_tasks):
- file_path = TEST_FILES[i % len(TEST_FILES)]
- task_id = f"concurrent-test-{uuid.uuid4().hex[:8]}"
- tasks.append((file_path, task_id))
- print(f"\n准备执行 {len(tasks)} 个任务...")
- for i, (file_path, task_id) in enumerate(tasks[:5]): # 只显示前5个
- file_name = file_path.split("/")[-1]
- print(f" [{task_id}] {file_name}")
- if len(tasks) > 5:
- print(f" ... 还有 {len(tasks) - 5} 个任务")
- print("\n开始并发执行...")
- print("-" * 70)
- start_time = time.time()
- results = []
- # 使用线程池执行
- with ThreadPoolExecutor(max_workers=num_workers) as executor:
- # 提交所有任务
- future_to_task = {
- executor.submit(parse_single_file, file_path, task_id): (file_path, task_id)
- for file_path, task_id in tasks
- }
- # 收集结果
- completed = 0
- for future in as_completed(future_to_task):
- file_path, task_id = future_to_task[future]
- try:
- result = future.result()
- results.append(result)
- completed += 1
- # 显示进度
- file_name = file_path.split("/")[-1]
- status = "✅" if result["success"] else "❌"
- print(f"[{completed}/{num_tasks}] {status} {task_id} - {file_name} "
- f"(总耗时: {result['total_time_ms']}ms)")
- except Exception as e:
- print(f"❌ 任务异常: {task_id} - {e}")
- results.append({
- "task_id": task_id,
- "file_path": file_path,
- "success": False,
- "error": str(e),
- "parse_time_ms": 0,
- "index_time_ms": 0,
- "total_time_ms": 0
- })
- total_time = time.time() - start_time
- # 统计结果
- success_count = sum(1 for r in results if r["success"])
- fail_count = len(results) - success_count
- # 计算平均耗时(仅统计成功的任务)
- successful_results = [r for r in results if r["success"]]
- if successful_results:
- avg_parse_time = sum(r["parse_time_ms"] for r in successful_results) / len(successful_results)
- avg_index_time = sum(r["index_time_ms"] for r in successful_results) / len(successful_results)
- avg_total_time = sum(r["total_time_ms"] for r in successful_results) / len(successful_results)
- else:
- avg_parse_time = avg_index_time = avg_total_time = 0
- # 打印总结
- print("\n" + "=" * 70)
- print("并发测试总结")
- print("=" * 70)
- print(f"总任务数: {len(results)}")
- print(f"并发数: {num_workers}")
- print(f"成功: {success_count}")
- print(f"失败: {fail_count}")
- print(f"总耗时: {total_time:.2f} 秒")
- print(f"吞吐量: {len(results)/total_time:.2f} 任务/秒")
- print()
- if successful_results:
- print("平均耗时(仅成功任务):")
- print(f" - 解析: {avg_parse_time:.0f} ms")
- print(f" - 向量化: {avg_index_time:.0f} ms")
- print(f" - 总计: {avg_total_time:.0f} ms")
- if fail_count > 0:
- print("\n失败任务详情:")
- for r in results:
- if not r["success"]:
- file_name = r["file_path"].split("/")[-1]
- print(f" - [{r['task_id']}] {file_name}: {r['error']}")
- print("=" * 70)
- return results
- def main():
- print("\n")
- print("╔" + "=" * 68 + "╗")
- print("║" + " " * 18 + "多模态解析 - 并发测试" + " " * 24 + "║")
- print("╚" + "=" * 68 + "╝")
- print()
- # 检查服务
- if not check_services():
- return 1
- # 测试配置
- print("请选择测试模式:")
- print(" 1. 小规模测试 (5 任务, 2 并发)")
- print(" 2. 中规模测试 (10 任务, 5 并发)")
- print(" 3. 大规模测试 (20 任务, 10 并发)")
- print(" 4. 自定义...")
- try:
- choice = input("\n请输入选项 (1-4, 默认 2): ").strip()
- if choice == "1":
- num_tasks, num_workers = 5, 2
- elif choice == "3":
- num_tasks, num_workers = 20, 10
- elif choice == "4":
- num_tasks = int(input("请输入任务数量: "))
- num_workers = int(input("请输入并发数: "))
- else:
- num_tasks, num_workers = 10, 5
- except KeyboardInterrupt:
- print("\n\n用户中断")
- return 1
- except Exception as e:
- print(f"\n输入错误: {e},使用默认配置 (10 任务, 5 并发)")
- num_tasks, num_workers = 10, 5
- # 运行测试
- try:
- results = run_concurrent_test(num_tasks, num_workers)
- return 0 if all(r["success"] for r in results) else 1
- except KeyboardInterrupt:
- print("\n\n用户中断")
- return 1
- if __name__ == "__main__":
- exit(main())
|