| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217 |
- #!/usr/bin/env python3
- """
- 完整流程测试:parse-service → embedding-api → Elasticsearch
- """
- import requests
- import json
- import time
- import sys
- import os
- # 配置
- PARSE_SERVICE_URL = "http://localhost:8000"
- EMBEDDING_API_URL = "http://localhost:8084"
- # 测试文件
- TEST_FILES = [
- "/Users/chaizi/Work/Projects/yusys/ai-study/code/four-level-schedule/duomotai/examples/5.docx",
- "/Users/chaizi/Work/Projects/yusys/ai-study/code/four-level-schedule/duomotai/examples/20以内口算.pdf",
- ]
- def check_services():
- """检查服务状态"""
- print("=" * 60)
- print("检查服务状态...")
- print("=" * 60)
- # 检查 parse-service
- try:
- response = requests.get(f"{PARSE_SERVICE_URL}/health", timeout=5)
- if response.status_code == 200:
- print("✅ parse-service 正常")
- else:
- print(f"❌ parse-service 异常: {response.status_code}")
- return False
- except Exception as e:
- print(f"❌ parse-service 不可达: {e}")
- print(" 请先启动: cd parse-service && python main.py")
- return False
- # 检查 schedule-embedding-api
- try:
- response = requests.get(f"{EMBEDDING_API_URL}/actuator/health", timeout=5)
- if response.status_code == 200:
- data = response.json()
- if data.get("status") == "UP":
- print("✅ schedule-embedding-api 正常")
- else:
- print(f"❌ schedule-embedding-api 状态异常: {data}")
- return False
- else:
- print(f"❌ schedule-embedding-api 异常: {response.status_code}")
- return False
- except Exception as e:
- print(f"❌ schedule-embedding-api 不可达: {e}")
- print(" 请先启动 schedule-embedding-api")
- return False
- print()
- return True
- def parse_file(file_path, task_id):
- """调用 parse-service 解析文件"""
- print(f" 步骤 1/2: 调用 parse-service 解析...")
- print(f" 文件: {file_path}")
- print(f" 任务ID: {task_id}")
- url = f"{PARSE_SERVICE_URL}/api/v1/parse/path?file_path={file_path}"
- try:
- response = requests.post(url, json={}, timeout=300)
- if response.status_code != 200:
- print(f" ❌ HTTP错误: {response.status_code}")
- print(f" {response.text}")
- return None
- data = response.json()
- if data.get("code") != 200:
- print(f" ❌ 解析失败: {data.get('message')}")
- return None
- result = data.get("data")
- print(f" ✅ 解析成功!")
- print(f" - 文件类型: {result.get('file_type')}")
- print(f" - 内容长度: {len(result.get('content', ''))} 字符")
- print(f" - 解析耗时: {result.get('parse_time_ms')} ms")
- return result
- except Exception as e:
- print(f" ❌ 调用异常: {e}")
- return None
- def index_document(task_id, file_path, parse_result):
- """调用 embedding-api 向量化并入库"""
- print(f" 步骤 2/2: 调用 embedding-api 向量化...")
- url = f"{EMBEDDING_API_URL}/api/v1/documents/index"
- # 提取文件名
- file_name = os.path.basename(file_path)
- # 构建请求体
- request_body = {
- "docId": task_id,
- "fileName": file_name,
- "fullText": parse_result.get("content", ""),
- "filePath": file_path,
- "fileType": parse_result.get("file_type"),
- "metadata": parse_result.get("metadata", {}),
- }
- # 添加 fileSize
- metadata = parse_result.get("metadata", {})
- if metadata and "file_size" in metadata:
- request_body["fileSize"] = metadata["file_size"]
- try:
- response = requests.post(
- url,
- json=request_body,
- headers={"Content-Type": "application/json"},
- timeout=120,
- )
- if response.status_code != 200:
- print(f" ❌ HTTP错误: {response.status_code}")
- print(f" {response.text}")
- return False
- data = response.json()
- if data.get("success"):
- print(f" ✅ 向量化并入库成功!")
- return True
- else:
- print(f" ❌ 向量化失败: {data.get('message')}")
- return False
- except Exception as e:
- print(f" ❌ 调用异常: {e}")
- return False
- def main():
- print("\n")
- print("╔" + "=" * 58 + "╗")
- print("║" + " " * 15 + "多模态解析 - 完整流程测试" + " " * 15 + "║")
- print("╚" + "=" * 58 + "╝")
- print()
- # 检查服务
- if not check_services():
- return 1
- # 处理每个文件
- success_count = 0
- fail_count = 0
- print("=" * 60)
- print(f"开始处理 {len(TEST_FILES)} 个文件...")
- print("=" * 60)
- for i, file_path in enumerate(TEST_FILES, 1):
- print()
- print("-" * 60)
- print(f"文件 {i}/{len(TEST_FILES)}: {os.path.basename(file_path)}")
- print("-" * 60)
- if not os.path.exists(file_path):
- print(f" ❌ 文件不存在: {file_path}")
- fail_count += 1
- continue
- # 生成任务ID
- task_id = f"test-{int(time.time())}-{i}"
- # 1. 解析文件
- parse_result = parse_file(file_path, task_id)
- if not parse_result:
- fail_count += 1
- continue
- # 2. 向量化并入库
- if index_document(task_id, file_path, parse_result):
- success_count += 1
- else:
- fail_count += 1
- # 稍微延迟一下
- time.sleep(1)
- # 总结
- print()
- print("=" * 60)
- print("测试总结")
- print("=" * 60)
- print(f"总计: {len(TEST_FILES)}")
- print(f"成功: {success_count}")
- print(f"失败: {fail_count}")
- if fail_count == 0:
- print()
- print("🎉 所有测试通过!")
- else:
- print()
- print("⚠️ 部分测试失败")
- print("=" * 60)
- print()
- return 0 if fail_count == 0 else 1
- if __name__ == "__main__":
- sys.exit(main())
|