#!/usr/bin/env python3 """ 完整流程测试:parse-service → embedding-api → Elasticsearch """ import requests import json import time import sys import os # 配置 PARSE_SERVICE_URL = "http://localhost:8000" EMBEDDING_API_URL = "http://localhost:8084" # 测试文件 TEST_FILES = [ "/Users/chaizi/Work/Projects/yusys/ai-study/code/four-level-schedule/duomotai/examples/5.docx", "/Users/chaizi/Work/Projects/yusys/ai-study/code/four-level-schedule/duomotai/examples/20以内口算.pdf", ] def check_services(): """检查服务状态""" print("=" * 60) print("检查服务状态...") print("=" * 60) # 检查 parse-service try: response = requests.get(f"{PARSE_SERVICE_URL}/health", timeout=5) if response.status_code == 200: print("✅ parse-service 正常") else: print(f"❌ parse-service 异常: {response.status_code}") return False except Exception as e: print(f"❌ parse-service 不可达: {e}") print(" 请先启动: cd parse-service && python main.py") return False # 检查 schedule-embedding-api try: response = requests.get(f"{EMBEDDING_API_URL}/actuator/health", timeout=5) if response.status_code == 200: data = response.json() if data.get("status") == "UP": print("✅ schedule-embedding-api 正常") else: print(f"❌ schedule-embedding-api 状态异常: {data}") return False else: print(f"❌ schedule-embedding-api 异常: {response.status_code}") return False except Exception as e: print(f"❌ schedule-embedding-api 不可达: {e}") print(" 请先启动 schedule-embedding-api") return False print() return True def parse_file(file_path, task_id): """调用 parse-service 解析文件""" print(f" 步骤 1/2: 调用 parse-service 解析...") print(f" 文件: {file_path}") print(f" 任务ID: {task_id}") url = f"{PARSE_SERVICE_URL}/api/v1/parse/path?file_path={file_path}" try: response = requests.post(url, json={}, timeout=300) if response.status_code != 200: print(f" ❌ HTTP错误: {response.status_code}") print(f" {response.text}") return None data = response.json() if data.get("code") != 200: print(f" ❌ 解析失败: {data.get('message')}") return None result = data.get("data") print(f" ✅ 解析成功!") print(f" - 文件类型: {result.get('file_type')}") print(f" - 内容长度: {len(result.get('content', ''))} 字符") print(f" - 解析耗时: {result.get('parse_time_ms')} ms") return result except Exception as e: print(f" ❌ 调用异常: {e}") return None def index_document(task_id, file_path, parse_result): """调用 embedding-api 向量化并入库""" print(f" 步骤 2/2: 调用 embedding-api 向量化...") url = f"{EMBEDDING_API_URL}/api/v1/documents/index" # 提取文件名 file_name = os.path.basename(file_path) # 构建请求体 request_body = { "docId": task_id, "fileName": file_name, "fullText": parse_result.get("content", ""), "filePath": file_path, "fileType": parse_result.get("file_type"), "metadata": parse_result.get("metadata", {}), } # 添加 fileSize metadata = parse_result.get("metadata", {}) if metadata and "file_size" in metadata: request_body["fileSize"] = metadata["file_size"] try: response = requests.post( url, json=request_body, headers={"Content-Type": "application/json"}, timeout=120, ) if response.status_code != 200: print(f" ❌ HTTP错误: {response.status_code}") print(f" {response.text}") return False data = response.json() if data.get("success"): print(f" ✅ 向量化并入库成功!") return True else: print(f" ❌ 向量化失败: {data.get('message')}") return False except Exception as e: print(f" ❌ 调用异常: {e}") return False def main(): print("\n") print("╔" + "=" * 58 + "╗") print("║" + " " * 15 + "多模态解析 - 完整流程测试" + " " * 15 + "║") print("╚" + "=" * 58 + "╝") print() # 检查服务 if not check_services(): return 1 # 处理每个文件 success_count = 0 fail_count = 0 print("=" * 60) print(f"开始处理 {len(TEST_FILES)} 个文件...") print("=" * 60) for i, file_path in enumerate(TEST_FILES, 1): print() print("-" * 60) print(f"文件 {i}/{len(TEST_FILES)}: {os.path.basename(file_path)}") print("-" * 60) if not os.path.exists(file_path): print(f" ❌ 文件不存在: {file_path}") fail_count += 1 continue # 生成任务ID task_id = f"test-{int(time.time())}-{i}" # 1. 解析文件 parse_result = parse_file(file_path, task_id) if not parse_result: fail_count += 1 continue # 2. 向量化并入库 if index_document(task_id, file_path, parse_result): success_count += 1 else: fail_count += 1 # 稍微延迟一下 time.sleep(1) # 总结 print() print("=" * 60) print("测试总结") print("=" * 60) print(f"总计: {len(TEST_FILES)}") print(f"成功: {success_count}") print(f"失败: {fail_count}") if fail_count == 0: print() print("🎉 所有测试通过!") else: print() print("⚠️ 部分测试失败") print("=" * 60) print() return 0 if fail_count == 0 else 1 if __name__ == "__main__": sys.exit(main())