# 多模态解析系统 基于 Flink + FastAPI + Elasticsearch 的多模态文档解析与智能检索系统。 --- ## 项目简介 本系统实现了多模态文档(PDF/Word/图片/音频/视频)的解析、向量化存储和智能检索功能。 ### 核心特性 - 📄 **多模态解析**:支持 PDF、Office 文档、图片、音频、视频等多种格式 - 🔍 **智能检索**:基于向量的语义搜索,支持文本查多模态 - ⚡ **批流一体**:支持批量历史数据处理和实时流处理(Flink) - 🎯 **可扩展架构**:模块化设计,易于扩展新的解析器和检索方式 --- ## 系统架构 ``` ┌─────────────────────────────────────────────────────────────┐ │ 数据接入层 │ │ (对象存储 / Kafka / 本地文件) │ └──────────────────────┬──────────────────────────────────────┘ │ ┌──────────────┼──────────────┐ │ │ │ ▼ ▼ ▼ ┌──────────────┐ ┌──────────┐ ┌─────────────────┐ │ 批量处理 │ │ 实时流 │ │ 直接调用 │ │ (Flink) │ │ (Flink) │ │ (HTTP API) │ └──────┬───────┘ └────┬─────┘ └───────┬─────────┘ │ │ │ └───────────────┼──────────────┘ ▼ ┌───────────────────────────────┐ │ parse-service │ │ (Python FastAPI) │ │ - PDF/Word/图片解析 │ │ - 音频/视频解析 │ └───────────────┬───────────────┘ │ ┌───────────┴───────────┐ ▼ ▼ ┌───────────────┐ ┌───────────────────────┐ │ Kafka │ │ schedule-embedding-api│ │ (可选解耦) │ │ (向量化 + ES 存储) │ └───────┬───────┘ └───────────┬───────────┘ │ │ └───────────┬────────────┘ ▼ ┌───────────────────────┐ │ Elasticsearch 8.x │ │ (向量存储 + KNN 搜索) │ └───────────────────────┘ ``` ### 完整数据流程 **方式一:直接 HTTP 调用(简单模式)** ``` 测试脚本 → parse-service → schedule-embedding-api → Elasticsearch ``` **方式二:Kafka 解耦 + Flink 批流一体(推荐生产模式)** ``` 数据生产者 → Kafka → Flink Job → parse-service → schedule-embedding-api → Elasticsearch ``` **Flink 定位**: - 不取代 parse-service(核心解析逻辑保持不变) - 作为批流一体引擎,负责任务编排和并行处理 - 提供 Exactly-Once 语义、Savepoint、状态管理、背压控制 --- ## 模块说明 | 模块 | 技术栈 | 端口 | 说明 | |------|--------|------|------| | **parse-service** | Python + FastAPI | 8000 | 多模态文档解析服务 | | **schedule-embedding-api** | Java 17 + Spring Boot | 8084 | 向量化存储与检索服务 | | **schedule-flink** | Java 17 + Flink 1.18 | - | 批流一体处理模块(可选) | | **schedule-admin** | Java 17 + Spring Boot | - | 管理后台 | --- ## 已实现功能 ### ✅ 1. 多模态文档解析 + 批流一体架构 - **PDF 解析**:原生文本提取 + 扫描件 OCR - **Office 文档**:Word (.docx/.doc)、PPT、Excel 解析 - **图片解析**:OCR 文字识别 - **音频解析**:ASR 语音转文字 - **视频解析**:关键帧提取 + 音频转文字 ### ✅ 2. 向量化存储 - 基于 Qwen3-Embedding-8B 生成 4096 维向量 - 自动文本分片(Chunking) - Elasticsearch 8.x 向量存储 - 动态字段支持(任意 metadata) ### ✅ 3. 智能检索 - **向量搜索** (`POST /api/v1/search`):语义相似度搜索 - **混合搜索** (`POST /api/v1/search/hybrid`):向量搜索 + 业务字段过滤 - **多模态检索 API** (`POST /api/v2/multimodal/search`):统一多模态查询接口(框架已搭建) ### ✅ 4. 批流一体处理 (Flink) - **Flink Job 框架**:`MultimodalParseJob.java` - 批流一体主 Job - **Async I/O**:异步调用 parse-service 和 embedding-api - **纯 Java 测试**:`PureJavaTest.java` - 不依赖 Flink 的端到端测试 - **简单客户端**:`SimpleParseServiceClient.java`、`SimpleEmbeddingApiClient.java` - 无日志依赖的轻量客户端 ### ✅ 5. 测试脚本 | 脚本 | 说明 | 使用方法 | |------|------|----------| | `test_full_pipeline_final.py` | 完整流程测试:parse-service → embedding-api → ES | `python3 test_full_pipeline_final.py` | | `test_search_final.py` | 向量检索功能测试 | `python3 test_search_final.py` | | `test_multimodal_search.py` | 多模态检索 API 测试 | `python3 test_multimodal_search.py` | | `test_concurrent.py` | **并发测试**:测试 parse-service 并发处理能力 | `python3 test_concurrent.py` | --- ## 待实现功能 ### 🚧 1. 多模态检索高级功能 根据《多模态智能检索方案.md》,后续可实现: - **图片查多模态**:上传图片检索相似内容 - **图文混合查询**:文本 + 图片联合查询 - **音频/视频查询**:语音提问、视频片段检索 - **跨模态理解与推理**:接入多模态大模型(Qwen-VL 等) - **多模态答案生成**:生成带配图的结构化答案 ### 🚧 2. 多模态数据前置分类 根据《多模态数据前置分类方案.md》,后续可实现: - **三级分类机制**:数仓元数据匹配 → 规则引擎 → 轻量 AI 预识别 - **分类准确率**:≥ 98% - **人工审核兜底**:低置信度文件进入人工审核队列 ### 🚧 3. Flink 集成 - 解决 logback 版本冲突问题 - 本地 Flink Job 测试 - 批处理模式(对象存储读取) - 流处理模式(Kafka 消费) --- ## 快速开始 ### 环境要求 - **JDK**: 17+ - **Python**: 3.9+ - **Elasticsearch**: 8.x - **Kafka**: 2.8+ (可选) - **Maven**: 3.6+ ### 1. 启动 parse-service ```bash cd parse-service python main.py ``` 服务地址: http://localhost:8000 健康检查: `curl http://localhost:8000/health` ### 2. 启动 schedule-embedding-api ```bash cd schedule-embedding-api jdk17 && mvn spring-boot:run ``` 服务地址: http://localhost:8084 健康检查: `curl http://localhost:8084/actuator/health` ### 3. 运行完整流程测试(HTTP 直接调用) ```bash python3 test_full_pipeline_final.py ``` ### 4. 运行并发测试 ```bash python3 test_concurrent.py ``` 可选测试模式: - 小规模 (5 任务, 2 并发) - 中规模 (10 任务, 5 并发) - 大规模 (20 任务, 10 并发) - 自定义 ### 5. 运行 Flink Job(可选) **无需部署 Flink 集群!** 可以直接本地运行: ```bash cd schedule-flink # 纯 Java 测试(不依赖 Flink,验证端到端流程) jdk17 && mvn exec:java -Dexec.mainClass="com.yusys.flink.PureJavaTest" # 或本地 Flink 模式(需要解决 logback 依赖冲突) jdk17 && mvn exec:java -Dexec.mainClass="com.yusys.flink.MultimodalParseJob" ``` **Flink 部署模式(可选)**: - 开发测试:本地运行(无需部署) - 小集群:Standalone 模式 - 生产环境:Docker Compose 或 Kubernetes --- ## API 文档 ### parse-service API | 接口 | 方法 | 说明 | |------|------|------| | `/health` | GET | 健康检查 | | `/api/v1/parse/path` | POST | 解析文件(通过路径) | ### schedule-embedding-api API 详细文档见:`schedule-embedding-api/docs/API接口文档.md` | 接口 | 方法 | 说明 | |------|------|------| | `/actuator/health` | GET | 健康检查 | | `/api/v1/documents/index` | POST | 单个文档入库 | | `/api/v1/documents/batch-index` | POST | 批量文档入库 | | `/api/v1/search` | POST | 向量搜索 | | `/api/v1/search/hybrid` | POST | 混合搜索 | | `/api/v2/multimodal/search` | POST | 多模态统一检索 | --- ## 测试脚本使用说明 ### 1. test_full_pipeline_final.py **用途**:测试完整的解析流程(parse-service → embedding-api → Elasticsearch) **使用方法**: ```bash python3 test_full_pipeline_final.py ``` **功能**: - 检查 parse-service 和 schedule-embedding-api 健康状态 - 解析测试文件(5.docx、20以内口算.pdf) - 调用 embedding-api 向量化并入库 - 显示测试总结 --- ### 2. test_search_final.py **用途**:测试向量检索功能 **使用方法**: ```bash python3 test_search_final.py ``` **功能**: - 测试查询:"数学"、"口算题"、"一年级" - 显示 Top-K 检索结果 - 显示相似度分数和内容片段 --- ### 3. test_multimodal_search.py **用途**:测试多模态检索 API(v2 版本) **使用方法**: ```bash python3 test_multimodal_search.py ``` **功能**: - 测试 TEXT_ONLY 模式(兼容现有能力) - 测试 TEXT_TO_MULTIMODAL 模式 - 测试 IMAGE_TO_MULTIMODAL 模式(待实现) - 显示完整响应 --- ### 4. test_concurrent.py ⭐ **用途**:测试 parse-service 的并发处理能力 **使用方法**: ```bash python3 test_concurrent.py ``` **功能**: - 检查 parse-service 和 schedule-embedding-api 健康状态 - 支持多种并发模式(小规模/中规模/大规模/自定义) - 使用线程池并发调用 parse-service - 实时显示每个任务的执行状态 - 统计吞吐量、平均耗时、成功/失败数量 **测试指标**: - 总任务数 - 并发数(工作线程数) - 成功/失败数量 - 总耗时 - 吞吐量(任务/秒) - 平均解析耗时 - 平均向量化耗时 - 平均总耗时 ## 项目结构 ``` four-level-schedule/ ├── README.md # 本文件 ├── pom.xml # 父 POM ├── docker-compose.yml # Docker Compose 配置 ├── 多模态数据前置分类方案.md # 前置分类方案文档 ├── 多模态智能检索方案.md # 智能检索方案文档 │ ├── parse-service/ # 多模态解析服务 │ ├── main.py # 入口文件 │ ├── core/ # 核心路由 │ ├── parsers/ # 解析器 │ ├── models/ # 数据模型 │ └── utils/ # 工具类 │ ├── schedule-embedding-api/ # 向量化存储与检索服务 │ ├── pom.xml │ ├── docs/ # API 文档 │ └── src/main/java/cn/com/yusys/manager/ │ ├── controller/ # 控制器 │ │ ├── VectorSearchController.java │ │ └── MultimodalSearchController.java # 多模态检索(新增) │ ├── service/ # 服务层 │ ├── repository/ # 数据访问层 │ └── model/ │ ├── dto/ # DTO │ │ ├── MultimodalSearchRequest.java # 多模态请求(新增) │ │ └── MultimodalSearchResponse.java # 多模态响应(新增) │ └── entity/ │ ├── schedule-flink/ # Flink 批流一体处理模块 │ ├── pom.xml │ └── src/main/java/com/yusys/flink/ │ ├── MultimodalParseJob.java # Flink 主 Job │ ├── SimpleLocalTest.java # 简单本地测试 │ ├── PureJavaTest.java # 纯 Java 测试(无 Flink) │ └── client/ # HTTP 客户端 │ ├── schedule-admin/ # 管理后台 │ ├── test_full_pipeline_final.py # 完整流程测试 ├── test_search_final.py # 向量检索测试 └── test_multimodal_search.py # 多模态检索测试 ``` --- ## 配置说明 ### parse-service 配置 见:`parse-service/config.py` 主要配置项: - MinerU API 地址 - Qwen3-VL API 地址 - Qwen3-ASR API 地址 - Kafka 配置(可选) ### schedule-embedding-api 配置 见:`schedule-embedding-api/src/main/resources/application.yml` 主要配置项: - Elasticsearch 连接信息 - Embedding API 地址 - MySQL 连接信息 - Kafka 配置(可选) --- ## 常见问题 ### Q: parse-service 启动失败? A: 检查 Python 依赖是否安装: ```bash cd parse-service pip install -r requirements.txt ``` ### Q: schedule-embedding-api 连接 ES 失败? A: 检查 Elasticsearch 是否启动,配置中的地址是否正确。 ### Q: 向量搜索返回空结果? A: 确保先有文档入库,可以运行 `test_full_pipeline_final.py` 来测试入库。 --- ## 许可证 本项目采用内部许可证,未经授权不得用于商业用途。 --- ## 联系方式 如有问题或建议,请联系项目维护者。