非结构解析四级调度模块

chaixuhong fc870a3360 flink test 1 долоо хоног өмнө
.idea 795f64b834 Merge branch 'embedding-jdk' 2 долоо хоног өмнө
docs fc870a3360 flink test 1 долоо хоног өмнө
duomotai fc870a3360 flink test 1 долоо хоног өмнө
parse-service fc870a3360 flink test 1 долоо хоног өмнө
schedule-admin fc870a3360 flink test 1 долоо хоног өмнө
schedule-embedding-api fc870a3360 flink test 1 долоо хоног өмнө
schedule-flink fc870a3360 flink test 1 долоо хоног өмнө
.gitignore a4335e9e64 排除venv 2 долоо хоног өмнө
Flink_Docker_使用指南.md fc870a3360 flink test 1 долоо хоног өмнө
README.md fc870a3360 flink test 1 долоо хоног өмнө
docker-compose-flink.yml fc870a3360 flink test 1 долоо хоног өмнө
docker-compose.yml fc870a3360 flink test 1 долоо хоног өмнө
pom.xml fc870a3360 flink test 1 долоо хоног өмнө
send_task_to_kafka.py fc870a3360 flink test 1 долоо хоног өмнө
send_task_to_kafka_by_type.py fc870a3360 flink test 1 долоо хоног өмнө
test_concurrent.py fc870a3360 flink test 1 долоо хоног өмнө
test_full_pipeline_final.py fc870a3360 flink test 1 долоо хоног өмнө
test_kafka_mode.py fc870a3360 flink test 1 долоо хоног өмнө
test_multimodal_search.py fc870a3360 flink test 1 долоо хоног өмнө
test_search_final.py fc870a3360 flink test 1 долоо хоног өмнө
多模态数据前置分类方案.md fc870a3360 flink test 1 долоо хоног өмнө
多模态智能检索方案.md fc870a3360 flink test 1 долоо хоног өмнө
完整流程测试指南.md fc870a3360 flink test 1 долоо хоног өмнө

README.md

多模态解析系统

基于 Flink + FastAPI + Elasticsearch 的多模态文档解析与智能检索系统。


项目简介

本系统实现了多模态文档(PDF/Word/图片/音频/视频)的解析、向量化存储和智能检索功能。

核心特性

  • 📄 多模态解析:支持 PDF、Office 文档、图片、音频、视频等多种格式
  • 🔍 智能检索:基于向量的语义搜索,支持文本查多模态
  • 批流一体:支持批量历史数据处理和实时流处理(Flink)
  • 🎯 可扩展架构:模块化设计,易于扩展新的解析器和检索方式

系统架构

┌─────────────────────────────────────────────────────────────┐
│                         数据接入层                            │
│  (对象存储 / Kafka / 本地文件)                               │
└──────────────────────┬──────────────────────────────────────┘
                       │
        ┌──────────────┼──────────────┐
        │              │              │
        ▼              ▼              ▼
┌──────────────┐ ┌──────────┐ ┌─────────────────┐
│  批量处理    │ │  实时流   │ │   直接调用       │
│  (Flink)     │ │ (Flink)   │ │ (HTTP API)      │
└──────┬───────┘ └────┬─────┘ └───────┬─────────┘
       │               │              │
       └───────────────┼──────────────┘
                       ▼
        ┌───────────────────────────────┐
        │      parse-service            │
        │  (Python FastAPI)            │
        │  - PDF/Word/图片解析         │
        │  - 音频/视频解析             │
        └───────────────┬───────────────┘
                        │
            ┌───────────┴───────────┐
            ▼                       ▼
    ┌───────────────┐    ┌───────────────────────┐
    │   Kafka       │    │ schedule-embedding-api│
    │  (可选解耦)   │    │  (向量化 + ES 存储)   │
    └───────┬───────┘    └───────────┬───────────┘
            │                        │
            └───────────┬────────────┘
                        ▼
            ┌───────────────────────┐
            │  Elasticsearch 8.x    │
            │  (向量存储 + KNN 搜索) │
            └───────────────────────┘

完整数据流程

方式一:直接 HTTP 调用(简单模式)

测试脚本 → parse-service → schedule-embedding-api → Elasticsearch

方式二:Kafka 解耦 + Flink 批流一体(推荐生产模式)

数据生产者 → Kafka → Flink Job → parse-service → schedule-embedding-api → Elasticsearch

Flink 定位

  • 不取代 parse-service(核心解析逻辑保持不变)
  • 作为批流一体引擎,负责任务编排和并行处理
  • 提供 Exactly-Once 语义、Savepoint、状态管理、背压控制

模块说明

模块 技术栈 端口 说明
parse-service Python + FastAPI 8000 多模态文档解析服务
schedule-embedding-api Java 17 + Spring Boot 8084 向量化存储与检索服务
schedule-flink Java 17 + Flink 1.18 - 批流一体处理模块(可选)
schedule-admin Java 17 + Spring Boot - 管理后台

已实现功能

✅ 1. 多模态文档解析 + 批流一体架构

  • PDF 解析:原生文本提取 + 扫描件 OCR
  • Office 文档:Word (.docx/.doc)、PPT、Excel 解析
  • 图片解析:OCR 文字识别
  • 音频解析:ASR 语音转文字
  • 视频解析:关键帧提取 + 音频转文字

✅ 2. 向量化存储

  • 基于 Qwen3-Embedding-8B 生成 4096 维向量
  • 自动文本分片(Chunking)
  • Elasticsearch 8.x 向量存储
  • 动态字段支持(任意 metadata)

✅ 3. 智能检索

  • 向量搜索 (POST /api/v1/search):语义相似度搜索
  • 混合搜索 (POST /api/v1/search/hybrid):向量搜索 + 业务字段过滤
  • 多模态检索 API (POST /api/v2/multimodal/search):统一多模态查询接口(框架已搭建)

✅ 4. 批流一体处理 (Flink)

  • Flink Job 框架MultimodalParseJob.java - 批流一体主 Job
  • Async I/O:异步调用 parse-service 和 embedding-api
  • 纯 Java 测试PureJavaTest.java - 不依赖 Flink 的端到端测试
  • 简单客户端SimpleParseServiceClient.javaSimpleEmbeddingApiClient.java - 无日志依赖的轻量客户端

✅ 5. 测试脚本

脚本 说明 使用方法
test_full_pipeline_final.py 完整流程测试:parse-service → embedding-api → ES python3 test_full_pipeline_final.py
test_search_final.py 向量检索功能测试 python3 test_search_final.py
test_multimodal_search.py 多模态检索 API 测试 python3 test_multimodal_search.py
test_concurrent.py 并发测试:测试 parse-service 并发处理能力 python3 test_concurrent.py

待实现功能

🚧 1. 多模态检索高级功能

根据《多模态智能检索方案.md》,后续可实现:

  • 图片查多模态:上传图片检索相似内容
  • 图文混合查询:文本 + 图片联合查询
  • 音频/视频查询:语音提问、视频片段检索
  • 跨模态理解与推理:接入多模态大模型(Qwen-VL 等)
  • 多模态答案生成:生成带配图的结构化答案

🚧 2. 多模态数据前置分类

根据《多模态数据前置分类方案.md》,后续可实现:

  • 三级分类机制:数仓元数据匹配 → 规则引擎 → 轻量 AI 预识别
  • 分类准确率:≥ 98%
  • 人工审核兜底:低置信度文件进入人工审核队列

🚧 3. Flink 集成

  • 解决 logback 版本冲突问题
  • 本地 Flink Job 测试
  • 批处理模式(对象存储读取)
  • 流处理模式(Kafka 消费)

快速开始

环境要求

  • JDK: 17+
  • Python: 3.9+
  • Elasticsearch: 8.x
  • Kafka: 2.8+ (可选)
  • Maven: 3.6+

1. 启动 parse-service

cd parse-service
python main.py

服务地址: http://localhost:8000

健康检查: curl http://localhost:8000/health

2. 启动 schedule-embedding-api

cd schedule-embedding-api
jdk17 && mvn spring-boot:run

服务地址: http://localhost:8084

健康检查: curl http://localhost:8084/actuator/health

3. 运行完整流程测试(HTTP 直接调用)

python3 test_full_pipeline_final.py

4. 运行并发测试

python3 test_concurrent.py

可选测试模式:

  • 小规模 (5 任务, 2 并发)
  • 中规模 (10 任务, 5 并发)
  • 大规模 (20 任务, 10 并发)
  • 自定义

5. 运行 Flink Job(可选)

无需部署 Flink 集群! 可以直接本地运行:

cd schedule-flink
# 纯 Java 测试(不依赖 Flink,验证端到端流程)
jdk17 && mvn exec:java -Dexec.mainClass="com.yusys.flink.PureJavaTest"

# 或本地 Flink 模式(需要解决 logback 依赖冲突)
jdk17 && mvn exec:java -Dexec.mainClass="com.yusys.flink.MultimodalParseJob"

Flink 部署模式(可选)

  • 开发测试:本地运行(无需部署)
  • 小集群:Standalone 模式
  • 生产环境:Docker Compose 或 Kubernetes

API 文档

parse-service API

接口 方法 说明
/health GET 健康检查
/api/v1/parse/path POST 解析文件(通过路径)

schedule-embedding-api API

详细文档见:schedule-embedding-api/docs/API接口文档.md

接口 方法 说明
/actuator/health GET 健康检查
/api/v1/documents/index POST 单个文档入库
/api/v1/documents/batch-index POST 批量文档入库
/api/v1/search POST 向量搜索
/api/v1/search/hybrid POST 混合搜索
/api/v2/multimodal/search POST 多模态统一检索

测试脚本使用说明

1. test_full_pipeline_final.py

用途:测试完整的解析流程(parse-service → embedding-api → Elasticsearch)

使用方法

python3 test_full_pipeline_final.py

功能

  • 检查 parse-service 和 schedule-embedding-api 健康状态
  • 解析测试文件(5.docx、20以内口算.pdf)
  • 调用 embedding-api 向量化并入库
  • 显示测试总结

2. test_search_final.py

用途:测试向量检索功能

使用方法

python3 test_search_final.py

功能

  • 测试查询:"数学"、"口算题"、"一年级"
  • 显示 Top-K 检索结果
  • 显示相似度分数和内容片段

3. test_multimodal_search.py

用途:测试多模态检索 API(v2 版本)

使用方法

python3 test_multimodal_search.py

功能

  • 测试 TEXT_ONLY 模式(兼容现有能力)
  • 测试 TEXT_TO_MULTIMODAL 模式
  • 测试 IMAGE_TO_MULTIMODAL 模式(待实现)
  • 显示完整响应

4. test_concurrent.py ⭐

用途:测试 parse-service 的并发处理能力

使用方法

python3 test_concurrent.py

功能

  • 检查 parse-service 和 schedule-embedding-api 健康状态
  • 支持多种并发模式(小规模/中规模/大规模/自定义)
  • 使用线程池并发调用 parse-service
  • 实时显示每个任务的执行状态
  • 统计吞吐量、平均耗时、成功/失败数量

测试指标

  • 总任务数
  • 并发数(工作线程数)
  • 成功/失败数量
  • 总耗时
  • 吞吐量(任务/秒)
  • 平均解析耗时
  • 平均向量化耗时
  • 平均总耗时

项目结构

four-level-schedule/
├── README.md                          # 本文件
├── pom.xml                            # 父 POM
├── docker-compose.yml                 # Docker Compose 配置
├── 多模态数据前置分类方案.md          # 前置分类方案文档
├── 多模态智能检索方案.md              # 智能检索方案文档
│
├── parse-service/                     # 多模态解析服务
│   ├── main.py                        # 入口文件
│   ├── core/                          # 核心路由
│   ├── parsers/                       # 解析器
│   ├── models/                        # 数据模型
│   └── utils/                         # 工具类
│
├── schedule-embedding-api/            # 向量化存储与检索服务
│   ├── pom.xml
│   ├── docs/                          # API 文档
│   └── src/main/java/cn/com/yusys/manager/
│       ├── controller/                 # 控制器
│       │   ├── VectorSearchController.java
│       │   └── MultimodalSearchController.java  # 多模态检索(新增)
│       ├── service/                    # 服务层
│       ├── repository/                 # 数据访问层
│       └── model/
│           ├── dto/                    # DTO
│           │   ├── MultimodalSearchRequest.java   # 多模态请求(新增)
│           │   └── MultimodalSearchResponse.java  # 多模态响应(新增)
│           └── entity/
│
├── schedule-flink/                     # Flink 批流一体处理模块
│   ├── pom.xml
│   └── src/main/java/com/yusys/flink/
│       ├── MultimodalParseJob.java    # Flink 主 Job
│       ├── SimpleLocalTest.java       # 简单本地测试
│       ├── PureJavaTest.java          # 纯 Java 测试(无 Flink)
│       └── client/                     # HTTP 客户端
│
├── schedule-admin/                     # 管理后台
│
├── test_full_pipeline_final.py        # 完整流程测试
├── test_search_final.py               # 向量检索测试
└── test_multimodal_search.py          # 多模态检索测试

配置说明

parse-service 配置

见:parse-service/config.py

主要配置项:

  • MinerU API 地址
  • Qwen3-VL API 地址
  • Qwen3-ASR API 地址
  • Kafka 配置(可选)

schedule-embedding-api 配置

见:schedule-embedding-api/src/main/resources/application.yml

主要配置项:

  • Elasticsearch 连接信息
  • Embedding API 地址
  • MySQL 连接信息
  • Kafka 配置(可选)

常见问题

Q: parse-service 启动失败?

A: 检查 Python 依赖是否安装:

cd parse-service
pip install -r requirements.txt

Q: schedule-embedding-api 连接 ES 失败?

A: 检查 Elasticsearch 是否启动,配置中的地址是否正确。

Q: 向量搜索返回空结果?

A: 确保先有文档入库,可以运行 test_full_pipeline_final.py 来测试入库。


许可证

本项目采用内部许可证,未经授权不得用于商业用途。


联系方式

如有问题或建议,请联系项目维护者。