完整流程测试指南.md 5.4 KB

完整流程测试指南

完整流程架构

┌─────────────────────────────────────────────────────────────────────┐
│                        完整数据流程                                    │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│  1. 发送任务消息              send_task_to_kafka.py                  │
│              │                                                         │
│              ▼                                                         │
│  2. Kafka: parse-tasks       (任务队列)                              │
│              │                                                         │
│              ▼                                                         │
│  3. Flink (或模拟器)         消费任务,调用 parse-service            │
│              │                                                         │
│              ▼                                                         │
│  4. parse-service             解析文件 (PDF/Word/...)                │
│              │                                                         │
│              ▼                                                         │
│  5. Kafka: embedding-topic   (解析结果队列)                          │
│              │                                                         │
│              ▼                                                         │
│  6. schedule-embedding-api   消费结果,向量化,索引到 ES             │
│              │                                                         │
│              ▼                                                         │
│  7. Elasticsearch             向量存储 + KNN 搜索                     │
│              │                                                         │
│              ▼                                                         │
│  8. 检索                      test_search_final.py                    │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

测试步骤

前置条件

确保以下服务已启动:

  • ✅ parse-service (端口 8000,Kafka 已启用)
  • ✅ schedule-embedding-api (端口 8084)
  • ✅ Kafka (10.192.72.13:9092)
  • ✅ MySQL
  • ✅ Elasticsearch

步骤 1: 启动 Flink 消费者模拟器

打开一个新终端窗口,运行:

cd /Users/chaizi/Work/Projects/yusys/ai-study/code/four-level-schedule
python3 flink_consumer_simulator.py

这个脚本会:

  • parse-tasks topic 消费任务
  • 调用 parse-service 解析文件
  • 将解析结果发送到 embedding-topic

步骤 2: 发送任务消息到 Kafka

打开另一个终端窗口,运行:

cd /Users/chaizi/Work/Projects/yusys/ai-study/code/four-level-schedule
python3 send_task_to_kafka.py

这个脚本会:

  • 发送测试文件路径到 parse-tasks topic

步骤 3: 观察处理过程

在 Flink 消费者模拟器的终端窗口中,你应该看到:

收到任务: taskId=xxx, file=5.docx
  调用 parse-service 解析...
    ✅ 解析成功!
  发送结果到 Kafka...
    ✅ 结果已发送到 topic: embedding-topic

步骤 4: 验证文档已索引

运行向量搜索验证:

python3 test_search_final.py

你应该能看到刚刚通过完整流程处理的文档。


各组件说明

组件 脚本/服务 说明
任务生产者 send_task_to_kafka.py 发送文件路径到 parse-tasks
任务队列 Kafka topic: parse-tasks 存储待解析的文件任务
Flink 消费者 flink_consumer_simulator.py 模拟 Flink,消费任务并解析
解析服务 parse-service (8000) 实际解析文件
结果队列 Kafka topic: embedding-topic 存储解析结果
向量化服务 schedule-embedding-api (8084) 消费结果,向量化,索引
向量存储 Elasticsearch 存储向量和文档
检索测试 test_search_final.py 验证检索功能

测试脚本汇总

脚本 说明
test_full_pipeline_final.py HTTP 直接调用模式 (简单)
test_kafka_mode.py Kafka 解耦模式 (parse-service 直接发 Kafka)
send_task_to_kafka.py 发送任务到 parse-tasks (完整流程第一步)
flink_consumer_simulator.py Flink 消费者模拟器 (完整流程核心)
test_concurrent.py 并发测试
test_search_final.py 向量检索测试
test_multimodal_search.py 多模态检索 API 测试

下一步

当前使用 flink_consumer_simulator.py 模拟 Flink。

真正的 Flink Job 需要解决:

  1. logback 版本冲突问题
  2. Flink Kafka 连接器依赖问题

然后就可以使用 schedule-flink 模块中的 MultimodalParseJob 了。