开发计划.md 9.5 KB

schedule-embedding-api 开发计划

一、模块定位

  • REST API:对外提供 HTTP 接口,支持同步入库、搜索、文档管理
  • Kafka 消费:监听 embedding 专用 Topic,异步消费文档入库任务,内部完成分段、向量化、入库

二、数据流

                    ┌─────────────────────────────────────────┐
                    │           schedule-producer              │
                    │    发送文档入库任务到 embedding-topic     │
                    └────────────────────┬──────────────────────┘
                                        │
                                        ↓
                    ┌─────────────────────────────────────────┐
                    │        schedule-embedding-api             │
                    │  ┌─────────────────┐ ┌────────────────┐ │
                    │  │ Kafka Consumer  │ │   REST API      │ │
                    │  │ @KafkaListener  │ │  POST/GET/...  │ │
                    │  └────────┬────────┘ └───────┬──────────┘ │
                    │          │                  │            │
                    │          └────────┬─────────┘            │
                    │                   ↓                       │
                    │     TextEmbeddingService (分段→向量化→入库)│
                    └───────────────────┬──────────────────────┘
                                        │
                      ┌─────────────────┼─────────────────┐
                      ↓                 ↓                  ↓
              Embedding 接口      Elasticsearch      (可选) 回调通知

三、TodoList

1. 项目骨架与依赖

  • 1.1 新建 schedule-embedding-api 模块,继承父 pom
  • 1.2 添加 Spring Boot Web、Validation 依赖
  • 1.3 添加 Spring Kafka 依赖
  • 1.4 添加 Elasticsearch Rest High Level Client (7.17.x,兼容 JDK8)
  • 1.5 添加 OkHttp 依赖(调用 Embedding 接口)
  • 1.6 添加 Jackson、Lombok、Commons-Lang3
  • 1.7 新建 ScheduleEmbeddingApplication 启动类
  • 1.8 配置 application.yml 基础结构(端口、应用名)

2. 配置与基础设施

  • 2.1 配置 Embedding 接口:url、apiKey、model、timeout、batchSize
  • 2.2 配置 Elasticsearch:host、indexName、shards、replicas、超时
  • 2.3 配置 Chunk 分段:maxLength、minLength、overlap、mode
  • 2.4 配置 Kafka:bootstrap-servers、group-id、embedding-topic、enable-auto-commit
  • 2.5 新建 ElasticsearchConfig,创建 RestHighLevelClient Bean
  • 2.6 新建 EmbeddingConfig,读取 Embedding 相关配置
  • 2.7 新建 ChunkConfig,读取分段参数
  • 2.8 新建 KafkaConsumerConfig,配置反序列化、ack 模式

3. 数据模型

  • 3.1 新建 Chunk 实体:docId、chunkId、chunkIndex、content、contentLength、metadata
  • 3.2 新建 TextDocument 实体:与 ES 文档结构对应
  • 3.3 新建 IndexRequest DTO:docId、fileName、fullText、metadata
  • 3.4 新建 BatchIndexRequest DTO:List<IndexRequest>
  • 3.5 新建 SearchRequest DTO:query、topK、filters
  • 3.6 新建 SearchResult DTO:chunkId、content、score、metadata
  • 3.7 新建 IndexResult VO:success、docId、chunkCount、message
  • 3.8 新建 BatchIndexResult VO:totalCount、successCount、failedCount、failedItems
  • 3.9 定义 Kafka 消息体 DTO(如 EmbeddingTaskMessage):docId、fullText、metadata、回调 URL 等

4. 文本分段

  • 4.1 新建 TextSplitter 组件,注入分段配置
  • 4.2 实现 preprocess:统一换行、合并空格、trim
  • 4.3 实现 splitByParagraph:按双换行切分段落
  • 4.4 实现 split:按段落优先,超长再按 maxLength 切分,支持 overlap
  • 4.5 实现 createChunk:生成 chunkId、chunkIndex,填充 metadata
  • 4.6 编写 TextSplitterTest 单元测试

5. Embedding 客户端

  • 5.1 新建 EmbeddingClient 组件,注入 apiUrl、apiKey、model
  • 5.2 实现 getEmbedding(String text):单文本向量化,返回 4096 维 float[]
  • 5.3 实现 getBatchEmbeddings(List<String> texts):批量向量化(≤128 条)
  • 5.4 实现 HTTP 请求构造:POST /embeddings,Authorization: Bearer {key}
  • 5.5 实现响应解析:从 data[0].embedding 提取向量
  • 5.6 实现重试:指数退避,最多 3 次,超时 30 秒
  • 5.7 校验返回向量维度是否为 4096
  • 5.8 新建 EmbeddingException 自定义异常

6. Elasticsearch 操作

  • 6.1 新建 ElasticsearchRepository,注入 RestHighLevelClient 与 indexName
  • 6.2 实现 createIndex:创建 contract_chunks 索引,mapping 含 dense_vector(4096)、cosine
  • 6.3 实现 indexDocument:单文档写入,使用 chunkId 作为 _id(幂等)
  • 6.4 实现 bulkIndex:BulkRequest 批量写入
  • 6.5 实现 knnSearch:KNN 查询,cosine 相似度,返回 topK
  • 6.6 实现 searchWithFilters:KNN + bool filter(contractType、partyA 等)
  • 6.7 实现 findByDocId:按 doc_id 查询,按 chunk_index 排序
  • 6.8 实现 deleteByDocId:_delete_by_query 按 doc_id 删除
  • 6.9 实现 updateByDocId:_update_by_query 批量更新 metadata
  • 6.10 新建 ElasticsearchException 自定义异常
  • 6.11 启动时检查索引是否存在,不存在则自动创建

7. 业务服务层

  • 7.1 新建 TextEmbeddingService,注入 EmbeddingClient、ElasticsearchRepository、TextSplitter
  • 7.2 实现 indexDocument:全量文本 → 分段 → 批量向量化 → 批量入库
  • 7.3 实现 batchIndexDocuments:多文档入库,按 128 条分批向量化
  • 7.4 实现 search:query 向量化 → KNN 搜索 → 映射为 SearchResult
  • 7.5 实现 hybridSearch:query 向量化 + 业务过滤 → 混合查询
  • 7.6 新建 DocumentService,注入 ElasticsearchRepository
  • 7.7 实现 getDocument:按 docId 查所有 chunks
  • 7.8 实现 deleteDocument:按 docId 删除所有 chunks
  • 7.9 实现 updateDocument:按 docId 更新 metadata(_update_by_query)

8. REST 控制器

  • 8.1 新建 TextEmbeddingController,基础路径 /api/v1
  • 8.2 POST /documents/index:单个文档入库,请求体 IndexRequest,返回 IndexResult
  • 8.3 POST /documents/batch-index:批量入库,请求体 BatchIndexRequest,返回 BatchIndexResult
  • 8.4 新建 VectorSearchController
  • 8.5 POST /search:向量搜索,请求体 SearchRequest,返回 List<SearchResult>
  • 8.6 POST /search/hybrid:混合搜索(向量 + 业务过滤)
  • 8.7 新建 DocumentController
  • 8.8 GET /documents/{docId}:查询文档 chunks,支持 sortBy、order 参数
  • 8.9 DELETE /documents/{docId}:删除文档所有 chunks
  • 8.10 PUT /documents/{docId}:更新文档 metadata
  • 8.11 配置 Actuator,暴露 /actuator/health
  • 8.12 添加统一异常处理(@ControllerAdvice),返回标准错误结构

9. Kafka 消费

  • 9.1 新建 EmbeddingTaskMessage:解析 Kafka 消息体(docId、fullText、metadata、callbackUrl 等)
  • 9.2 新建 EmbeddingTaskListener,使用 @KafkaListener 监听 embedding-topic
  • 9.3 消费逻辑:解析消息 → 调用 TextEmbeddingService.indexDocument → ack
  • 9.4 处理失败:捕获异常、记录日志,根据策略决定 ack 或 nack(支持重试)
  • 9.5 可选:消费成功后,如有 callbackUrl,HTTP 回调通知上游
  • 9.6 配置 kafka.topics.embedding 为 embedding 专用 Topic 名称
  • 9.7 配置消费者 group-id(如 embedding-api-group),与现有 consumer 区分

10. Producer 联动(可选)

  • 10.1 在 schedule-producer 中增加“发送文档入库任务”能力(若尚未支持)
  • 10.2 定义 embedding-topic,与 producer 约定消息格式(JSON:docId、fullText、metadata)
  • 10.3 文档解析完成后,将 fullText 发送到 embedding-topic

11. 异常与重试

  • 11.1 Embedding 调用:超时 30s、最多重试 3 次、指数退避
  • 11.2 ES Bulk 部分失败:记录 failedItems,整体不抛异常,返回失败明细
  • 11.3 Kafka 消费异常:记录 failed 消息,支持 nack 或写入死信队列(若需)

12. 测试与验证

  • 12.1 TextSplitter 单元测试:多段文本、超长段落、边界情况
  • 12.2 EmbeddingClient 单元测试:Mock HTTP,校验请求与解析
  • 12.3 ElasticsearchRepository 集成测试:需本地或测试 ES
  • 12.4 TextEmbeddingService 集成测试:分段 + 向量化 + 入库 全流程
  • 12.5 REST API 接口测试:curl 或 Postman 验证
  • 12.6 Kafka 端到端测试:发送消息 → 消费 → 查 ES 验证入库
  • 12.7 健康检查:/actuator/health 含 ES、Embedding 连通性(可选)

参考文档schedule-embedding-api/向量化入库方案.md