# 完整流程测试指南 ## 完整流程架构 ``` ┌─────────────────────────────────────────────────────────────────────┐ │ 完整数据流程 │ ├─────────────────────────────────────────────────────────────────────┤ │ │ │ 1. 发送任务消息 send_task_to_kafka.py │ │ │ │ │ ▼ │ │ 2. Kafka: parse-tasks (任务队列) │ │ │ │ │ ▼ │ │ 3. Flink (或模拟器) 消费任务,调用 parse-service │ │ │ │ │ ▼ │ │ 4. parse-service 解析文件 (PDF/Word/...) │ │ │ │ │ ▼ │ │ 5. Kafka: embedding-topic (解析结果队列) │ │ │ │ │ ▼ │ │ 6. schedule-embedding-api 消费结果,向量化,索引到 ES │ │ │ │ │ ▼ │ │ 7. Elasticsearch 向量存储 + KNN 搜索 │ │ │ │ │ ▼ │ │ 8. 检索 test_search_final.py │ │ │ └─────────────────────────────────────────────────────────────────────┘ ``` --- ## 测试步骤 ### 前置条件 确保以下服务已启动: - ✅ parse-service (端口 8000,Kafka 已启用) - ✅ schedule-embedding-api (端口 8084) - ✅ Kafka (10.192.72.13:9092) - ✅ MySQL - ✅ Elasticsearch --- ### 步骤 1: 启动 Flink 消费者模拟器 打开一个新终端窗口,运行: ```bash cd /Users/chaizi/Work/Projects/yusys/ai-study/code/four-level-schedule python3 flink_consumer_simulator.py ``` 这个脚本会: - 从 `parse-tasks` topic 消费任务 - 调用 parse-service 解析文件 - 将解析结果发送到 `embedding-topic` --- ### 步骤 2: 发送任务消息到 Kafka 打开另一个终端窗口,运行: ```bash cd /Users/chaizi/Work/Projects/yusys/ai-study/code/four-level-schedule python3 send_task_to_kafka.py ``` 这个脚本会: - 发送测试文件路径到 `parse-tasks` topic --- ### 步骤 3: 观察处理过程 在 Flink 消费者模拟器的终端窗口中,你应该看到: ``` 收到任务: taskId=xxx, file=5.docx 调用 parse-service 解析... ✅ 解析成功! 发送结果到 Kafka... ✅ 结果已发送到 topic: embedding-topic ``` --- ### 步骤 4: 验证文档已索引 运行向量搜索验证: ```bash python3 test_search_final.py ``` 你应该能看到刚刚通过完整流程处理的文档。 --- ## 各组件说明 | 组件 | 脚本/服务 | 说明 | |------|-----------|------| | 任务生产者 | `send_task_to_kafka.py` | 发送文件路径到 parse-tasks | | 任务队列 | Kafka topic: `parse-tasks` | 存储待解析的文件任务 | | Flink 消费者 | `flink_consumer_simulator.py` | 模拟 Flink,消费任务并解析 | | 解析服务 | parse-service (8000) | 实际解析文件 | | 结果队列 | Kafka topic: `embedding-topic` | 存储解析结果 | | 向量化服务 | schedule-embedding-api (8084) | 消费结果,向量化,索引 | | 向量存储 | Elasticsearch | 存储向量和文档 | | 检索测试 | `test_search_final.py` | 验证检索功能 | --- ## 测试脚本汇总 | 脚本 | 说明 | |------|------| | `test_full_pipeline_final.py` | HTTP 直接调用模式 (简单) | | `test_kafka_mode.py` | Kafka 解耦模式 (parse-service 直接发 Kafka) | | `send_task_to_kafka.py` | 发送任务到 parse-tasks (完整流程第一步) | | `flink_consumer_simulator.py` | Flink 消费者模拟器 (完整流程核心) | | `test_concurrent.py` | 并发测试 | | `test_search_final.py` | 向量检索测试 | | `test_multimodal_search.py` | 多模态检索 API 测试 | --- ## 下一步 当前使用 `flink_consumer_simulator.py` 模拟 Flink。 真正的 Flink Job 需要解决: 1. logback 版本冲突问题 2. Flink Kafka 连接器依赖问题 然后就可以使用 `schedule-flink` 模块中的 `MultimodalParseJob` 了。