|
|
преди 1 седмица | |
|---|---|---|
| .idea | преди 2 седмици | |
| docs | преди 1 седмица | |
| duomotai | преди 1 седмица | |
| parse-service | преди 1 седмица | |
| schedule-admin | преди 1 седмица | |
| schedule-embedding-api | преди 1 седмица | |
| schedule-flink | преди 1 седмица | |
| .gitignore | преди 2 седмици | |
| Flink_Docker_使用指南.md | преди 1 седмица | |
| README.md | преди 1 седмица | |
| docker-compose-flink.yml | преди 1 седмица | |
| docker-compose.yml | преди 1 седмица | |
| pom.xml | преди 1 седмица | |
| send_task_to_kafka.py | преди 1 седмица | |
| send_task_to_kafka_by_type.py | преди 1 седмица | |
| test_concurrent.py | преди 1 седмица | |
| test_full_pipeline_final.py | преди 1 седмица | |
| test_kafka_mode.py | преди 1 седмица | |
| test_multimodal_search.py | преди 1 седмица | |
| test_search_final.py | преди 1 седмица | |
| 多模态数据前置分类方案.md | преди 1 седмица | |
| 多模态智能检索方案.md | преди 1 седмица | |
| 完整流程测试指南.md | преди 1 седмица |
基于 Flink + FastAPI + Elasticsearch 的多模态文档解析与智能检索系统。
本系统实现了多模态文档(PDF/Word/图片/音频/视频)的解析、向量化存储和智能检索功能。
┌─────────────────────────────────────────────────────────────┐
│ 数据接入层 │
│ (对象存储 / Kafka / 本地文件) │
└──────────────────────┬──────────────────────────────────────┘
│
┌──────────────┼──────────────┐
│ │ │
▼ ▼ ▼
┌──────────────┐ ┌──────────┐ ┌─────────────────┐
│ 批量处理 │ │ 实时流 │ │ 直接调用 │
│ (Flink) │ │ (Flink) │ │ (HTTP API) │
└──────┬───────┘ └────┬─────┘ └───────┬─────────┘
│ │ │
└───────────────┼──────────────┘
▼
┌───────────────────────────────┐
│ parse-service │
│ (Python FastAPI) │
│ - PDF/Word/图片解析 │
│ - 音频/视频解析 │
└───────────────┬───────────────┘
│
┌───────────┴───────────┐
▼ ▼
┌───────────────┐ ┌───────────────────────┐
│ Kafka │ │ schedule-embedding-api│
│ (可选解耦) │ │ (向量化 + ES 存储) │
└───────┬───────┘ └───────────┬───────────┘
│ │
└───────────┬────────────┘
▼
┌───────────────────────┐
│ Elasticsearch 8.x │
│ (向量存储 + KNN 搜索) │
└───────────────────────┘
方式一:直接 HTTP 调用(简单模式)
测试脚本 → parse-service → schedule-embedding-api → Elasticsearch
方式二:Kafka 解耦 + Flink 批流一体(推荐生产模式)
数据生产者 → Kafka → Flink Job → parse-service → schedule-embedding-api → Elasticsearch
Flink 定位:
| 模块 | 技术栈 | 端口 | 说明 |
|---|---|---|---|
| parse-service | Python + FastAPI | 8000 | 多模态文档解析服务 |
| schedule-embedding-api | Java 17 + Spring Boot | 8084 | 向量化存储与检索服务 |
| schedule-flink | Java 17 + Flink 1.18 | - | 批流一体处理模块(可选) |
| schedule-admin | Java 17 + Spring Boot | - | 管理后台 |
POST /api/v1/search):语义相似度搜索POST /api/v1/search/hybrid):向量搜索 + 业务字段过滤POST /api/v2/multimodal/search):统一多模态查询接口(框架已搭建)MultimodalParseJob.java - 批流一体主 JobPureJavaTest.java - 不依赖 Flink 的端到端测试SimpleParseServiceClient.java、SimpleEmbeddingApiClient.java - 无日志依赖的轻量客户端| 脚本 | 说明 | 使用方法 |
|---|---|---|
test_full_pipeline_final.py |
完整流程测试:parse-service → embedding-api → ES | python3 test_full_pipeline_final.py |
test_search_final.py |
向量检索功能测试 | python3 test_search_final.py |
test_multimodal_search.py |
多模态检索 API 测试 | python3 test_multimodal_search.py |
test_concurrent.py |
并发测试:测试 parse-service 并发处理能力 | python3 test_concurrent.py |
根据《多模态智能检索方案.md》,后续可实现:
根据《多模态数据前置分类方案.md》,后续可实现:
cd parse-service
python main.py
服务地址: http://localhost:8000
健康检查: curl http://localhost:8000/health
cd schedule-embedding-api
jdk17 && mvn spring-boot:run
服务地址: http://localhost:8084
健康检查: curl http://localhost:8084/actuator/health
python3 test_full_pipeline_final.py
python3 test_concurrent.py
可选测试模式:
无需部署 Flink 集群! 可以直接本地运行:
cd schedule-flink
# 纯 Java 测试(不依赖 Flink,验证端到端流程)
jdk17 && mvn exec:java -Dexec.mainClass="com.yusys.flink.PureJavaTest"
# 或本地 Flink 模式(需要解决 logback 依赖冲突)
jdk17 && mvn exec:java -Dexec.mainClass="com.yusys.flink.MultimodalParseJob"
Flink 部署模式(可选):
| 接口 | 方法 | 说明 |
|---|---|---|
/health |
GET | 健康检查 |
/api/v1/parse/path |
POST | 解析文件(通过路径) |
详细文档见:schedule-embedding-api/docs/API接口文档.md
| 接口 | 方法 | 说明 |
|---|---|---|
/actuator/health |
GET | 健康检查 |
/api/v1/documents/index |
POST | 单个文档入库 |
/api/v1/documents/batch-index |
POST | 批量文档入库 |
/api/v1/search |
POST | 向量搜索 |
/api/v1/search/hybrid |
POST | 混合搜索 |
/api/v2/multimodal/search |
POST | 多模态统一检索 |
用途:测试完整的解析流程(parse-service → embedding-api → Elasticsearch)
使用方法:
python3 test_full_pipeline_final.py
功能:
用途:测试向量检索功能
使用方法:
python3 test_search_final.py
功能:
用途:测试多模态检索 API(v2 版本)
使用方法:
python3 test_multimodal_search.py
功能:
用途:测试 parse-service 的并发处理能力
使用方法:
python3 test_concurrent.py
功能:
测试指标:
four-level-schedule/
├── README.md # 本文件
├── pom.xml # 父 POM
├── docker-compose.yml # Docker Compose 配置
├── 多模态数据前置分类方案.md # 前置分类方案文档
├── 多模态智能检索方案.md # 智能检索方案文档
│
├── parse-service/ # 多模态解析服务
│ ├── main.py # 入口文件
│ ├── core/ # 核心路由
│ ├── parsers/ # 解析器
│ ├── models/ # 数据模型
│ └── utils/ # 工具类
│
├── schedule-embedding-api/ # 向量化存储与检索服务
│ ├── pom.xml
│ ├── docs/ # API 文档
│ └── src/main/java/cn/com/yusys/manager/
│ ├── controller/ # 控制器
│ │ ├── VectorSearchController.java
│ │ └── MultimodalSearchController.java # 多模态检索(新增)
│ ├── service/ # 服务层
│ ├── repository/ # 数据访问层
│ └── model/
│ ├── dto/ # DTO
│ │ ├── MultimodalSearchRequest.java # 多模态请求(新增)
│ │ └── MultimodalSearchResponse.java # 多模态响应(新增)
│ └── entity/
│
├── schedule-flink/ # Flink 批流一体处理模块
│ ├── pom.xml
│ └── src/main/java/com/yusys/flink/
│ ├── MultimodalParseJob.java # Flink 主 Job
│ ├── SimpleLocalTest.java # 简单本地测试
│ ├── PureJavaTest.java # 纯 Java 测试(无 Flink)
│ └── client/ # HTTP 客户端
│
├── schedule-admin/ # 管理后台
│
├── test_full_pipeline_final.py # 完整流程测试
├── test_search_final.py # 向量检索测试
└── test_multimodal_search.py # 多模态检索测试
见:parse-service/config.py
主要配置项:
见:schedule-embedding-api/src/main/resources/application.yml
主要配置项:
A: 检查 Python 依赖是否安装:
cd parse-service
pip install -r requirements.txt
A: 检查 Elasticsearch 是否启动,配置中的地址是否正确。
A: 确保先有文档入库,可以运行 test_full_pipeline_final.py 来测试入库。
本项目采用内部许可证,未经授权不得用于商业用途。
如有问题或建议,请联系项目维护者。