┌─────────────────────────────────────────────────────────────────────┐
│ 完整数据流程 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ 1. 发送任务消息 send_task_to_kafka.py │
│ │ │
│ ▼ │
│ 2. Kafka: parse-tasks (任务队列) │
│ │ │
│ ▼ │
│ 3. Flink (或模拟器) 消费任务,调用 parse-service │
│ │ │
│ ▼ │
│ 4. parse-service 解析文件 (PDF/Word/...) │
│ │ │
│ ▼ │
│ 5. Kafka: embedding-topic (解析结果队列) │
│ │ │
│ ▼ │
│ 6. schedule-embedding-api 消费结果,向量化,索引到 ES │
│ │ │
│ ▼ │
│ 7. Elasticsearch 向量存储 + KNN 搜索 │
│ │ │
│ ▼ │
│ 8. 检索 test_search_final.py │
│ │
└─────────────────────────────────────────────────────────────────────┘
确保以下服务已启动:
打开一个新终端窗口,运行:
cd /Users/chaizi/Work/Projects/yusys/ai-study/code/four-level-schedule
python3 flink_consumer_simulator.py
这个脚本会:
parse-tasks topic 消费任务embedding-topic打开另一个终端窗口,运行:
cd /Users/chaizi/Work/Projects/yusys/ai-study/code/four-level-schedule
python3 send_task_to_kafka.py
这个脚本会:
parse-tasks topic在 Flink 消费者模拟器的终端窗口中,你应该看到:
收到任务: taskId=xxx, file=5.docx
调用 parse-service 解析...
✅ 解析成功!
发送结果到 Kafka...
✅ 结果已发送到 topic: embedding-topic
运行向量搜索验证:
python3 test_search_final.py
你应该能看到刚刚通过完整流程处理的文档。
| 组件 | 脚本/服务 | 说明 |
|---|---|---|
| 任务生产者 | send_task_to_kafka.py |
发送文件路径到 parse-tasks |
| 任务队列 | Kafka topic: parse-tasks |
存储待解析的文件任务 |
| Flink 消费者 | flink_consumer_simulator.py |
模拟 Flink,消费任务并解析 |
| 解析服务 | parse-service (8000) | 实际解析文件 |
| 结果队列 | Kafka topic: embedding-topic |
存储解析结果 |
| 向量化服务 | schedule-embedding-api (8084) | 消费结果,向量化,索引 |
| 向量存储 | Elasticsearch | 存储向量和文档 |
| 检索测试 | test_search_final.py |
验证检索功能 |
| 脚本 | 说明 |
|---|---|
test_full_pipeline_final.py |
HTTP 直接调用模式 (简单) |
test_kafka_mode.py |
Kafka 解耦模式 (parse-service 直接发 Kafka) |
send_task_to_kafka.py |
发送任务到 parse-tasks (完整流程第一步) |
flink_consumer_simulator.py |
Flink 消费者模拟器 (完整流程核心) |
test_concurrent.py |
并发测试 |
test_search_final.py |
向量检索测试 |
test_multimodal_search.py |
多模态检索 API 测试 |
当前使用 flink_consumer_simulator.py 模拟 Flink。
真正的 Flink Job 需要解决:
然后就可以使用 schedule-flink 模块中的 MultimodalParseJob 了。