# schedule-embedding-api 开发计划 ## 一、模块定位 - **REST API**:对外提供 HTTP 接口,支持同步入库、搜索、文档管理 - **Kafka 消费**:监听 embedding 专用 Topic,异步消费文档入库任务,内部完成分段、向量化、入库 ## 二、数据流 ``` ┌─────────────────────────────────────────┐ │ schedule-producer │ │ 发送文档入库任务到 embedding-topic │ └────────────────────┬──────────────────────┘ │ ↓ ┌─────────────────────────────────────────┐ │ schedule-embedding-api │ │ ┌─────────────────┐ ┌────────────────┐ │ │ │ Kafka Consumer │ │ REST API │ │ │ │ @KafkaListener │ │ POST/GET/... │ │ │ └────────┬────────┘ └───────┬──────────┘ │ │ │ │ │ │ └────────┬─────────┘ │ │ ↓ │ │ TextEmbeddingService (分段→向量化→入库)│ └───────────────────┬──────────────────────┘ │ ┌─────────────────┼─────────────────┐ ↓ ↓ ↓ Embedding 接口 Elasticsearch (可选) 回调通知 ``` --- ## 三、TodoList ### 1. 项目骨架与依赖 - [ ] 1.1 新建 `schedule-embedding-api` 模块,继承父 pom - [ ] 1.2 添加 Spring Boot Web、Validation 依赖 - [ ] 1.3 添加 Spring Kafka 依赖 - [ ] 1.4 添加 Elasticsearch Rest High Level Client (7.17.x,兼容 JDK8) - [ ] 1.5 添加 OkHttp 依赖(调用 Embedding 接口) - [ ] 1.6 添加 Jackson、Lombok、Commons-Lang3 - [ ] 1.7 新建 `ScheduleEmbeddingApplication` 启动类 - [ ] 1.8 配置 `application.yml` 基础结构(端口、应用名) ### 2. 配置与基础设施 - [ ] 2.1 配置 Embedding 接口:url、apiKey、model、timeout、batchSize - [ ] 2.2 配置 Elasticsearch:host、indexName、shards、replicas、超时 - [ ] 2.3 配置 Chunk 分段:maxLength、minLength、overlap、mode - [ ] 2.4 配置 Kafka:bootstrap-servers、group-id、embedding-topic、enable-auto-commit - [ ] 2.5 新建 `ElasticsearchConfig`,创建 `RestHighLevelClient` Bean - [ ] 2.6 新建 `EmbeddingConfig`,读取 Embedding 相关配置 - [ ] 2.7 新建 `ChunkConfig`,读取分段参数 - [ ] 2.8 新建 `KafkaConsumerConfig`,配置反序列化、ack 模式 ### 3. 数据模型 - [ ] 3.1 新建 `Chunk` 实体:docId、chunkId、chunkIndex、content、contentLength、metadata - [ ] 3.2 新建 `TextDocument` 实体:与 ES 文档结构对应 - [ ] 3.3 新建 `IndexRequest` DTO:docId、fileName、fullText、metadata - [ ] 3.4 新建 `BatchIndexRequest` DTO:List\ - [ ] 3.5 新建 `SearchRequest` DTO:query、topK、filters - [ ] 3.6 新建 `SearchResult` DTO:chunkId、content、score、metadata - [ ] 3.7 新建 `IndexResult` VO:success、docId、chunkCount、message - [ ] 3.8 新建 `BatchIndexResult` VO:totalCount、successCount、failedCount、failedItems - [ ] 3.9 定义 Kafka 消息体 DTO(如 `EmbeddingTaskMessage`):docId、fullText、metadata、回调 URL 等 ### 4. 文本分段 - [ ] 4.1 新建 `TextSplitter` 组件,注入分段配置 - [ ] 4.2 实现 `preprocess`:统一换行、合并空格、trim - [ ] 4.3 实现 `splitByParagraph`:按双换行切分段落 - [ ] 4.4 实现 `split`:按段落优先,超长再按 maxLength 切分,支持 overlap - [ ] 4.5 实现 `createChunk`:生成 chunkId、chunkIndex,填充 metadata - [ ] 4.6 编写 `TextSplitterTest` 单元测试 ### 5. Embedding 客户端 - [ ] 5.1 新建 `EmbeddingClient` 组件,注入 apiUrl、apiKey、model - [ ] 5.2 实现 `getEmbedding(String text)`:单文本向量化,返回 4096 维 float[] - [ ] 5.3 实现 `getBatchEmbeddings(List texts)`:批量向量化(≤128 条) - [ ] 5.4 实现 HTTP 请求构造:POST /embeddings,Authorization: Bearer {key} - [ ] 5.5 实现响应解析:从 data[0].embedding 提取向量 - [ ] 5.6 实现重试:指数退避,最多 3 次,超时 30 秒 - [ ] 5.7 校验返回向量维度是否为 4096 - [ ] 5.8 新建 `EmbeddingException` 自定义异常 ### 6. Elasticsearch 操作 - [ ] 6.1 新建 `ElasticsearchRepository`,注入 RestHighLevelClient 与 indexName - [ ] 6.2 实现 `createIndex`:创建 contract_chunks 索引,mapping 含 dense_vector(4096)、cosine - [ ] 6.3 实现 `indexDocument`:单文档写入,使用 chunkId 作为 _id(幂等) - [ ] 6.4 实现 `bulkIndex`:BulkRequest 批量写入 - [ ] 6.5 实现 `knnSearch`:KNN 查询,cosine 相似度,返回 topK - [ ] 6.6 实现 `searchWithFilters`:KNN + bool filter(contractType、partyA 等) - [ ] 6.7 实现 `findByDocId`:按 doc_id 查询,按 chunk_index 排序 - [ ] 6.8 实现 `deleteByDocId`:_delete_by_query 按 doc_id 删除 - [ ] 6.9 实现 `updateByDocId`:_update_by_query 批量更新 metadata - [ ] 6.10 新建 `ElasticsearchException` 自定义异常 - [ ] 6.11 启动时检查索引是否存在,不存在则自动创建 ### 7. 业务服务层 - [ ] 7.1 新建 `TextEmbeddingService`,注入 EmbeddingClient、ElasticsearchRepository、TextSplitter - [ ] 7.2 实现 `indexDocument`:全量文本 → 分段 → 批量向量化 → 批量入库 - [ ] 7.3 实现 `batchIndexDocuments`:多文档入库,按 128 条分批向量化 - [ ] 7.4 实现 `search`:query 向量化 → KNN 搜索 → 映射为 SearchResult - [ ] 7.5 实现 `hybridSearch`:query 向量化 + 业务过滤 → 混合查询 - [ ] 7.6 新建 `DocumentService`,注入 ElasticsearchRepository - [ ] 7.7 实现 `getDocument`:按 docId 查所有 chunks - [ ] 7.8 实现 `deleteDocument`:按 docId 删除所有 chunks - [ ] 7.9 实现 `updateDocument`:按 docId 更新 metadata(_update_by_query) ### 8. REST 控制器 - [ ] 8.1 新建 `TextEmbeddingController`,基础路径 `/api/v1` - [ ] 8.2 `POST /documents/index`:单个文档入库,请求体 IndexRequest,返回 IndexResult - [ ] 8.3 `POST /documents/batch-index`:批量入库,请求体 BatchIndexRequest,返回 BatchIndexResult - [ ] 8.4 新建 `VectorSearchController` - [ ] 8.5 `POST /search`:向量搜索,请求体 SearchRequest,返回 List\ - [ ] 8.6 `POST /search/hybrid`:混合搜索(向量 + 业务过滤) - [ ] 8.7 新建 `DocumentController` - [ ] 8.8 `GET /documents/{docId}`:查询文档 chunks,支持 sortBy、order 参数 - [ ] 8.9 `DELETE /documents/{docId}`:删除文档所有 chunks - [ ] 8.10 `PUT /documents/{docId}`:更新文档 metadata - [ ] 8.11 配置 Actuator,暴露 `/actuator/health` - [ ] 8.12 添加统一异常处理(@ControllerAdvice),返回标准错误结构 ### 9. Kafka 消费 - [ ] 9.1 新建 `EmbeddingTaskMessage`:解析 Kafka 消息体(docId、fullText、metadata、callbackUrl 等) - [ ] 9.2 新建 `EmbeddingTaskListener`,使用 @KafkaListener 监听 embedding-topic - [ ] 9.3 消费逻辑:解析消息 → 调用 TextEmbeddingService.indexDocument → ack - [ ] 9.4 处理失败:捕获异常、记录日志,根据策略决定 ack 或 nack(支持重试) - [ ] 9.5 可选:消费成功后,如有 callbackUrl,HTTP 回调通知上游 - [ ] 9.6 配置 `kafka.topics.embedding` 为 embedding 专用 Topic 名称 - [ ] 9.7 配置消费者 group-id(如 embedding-api-group),与现有 consumer 区分 ### 10. Producer 联动(可选) - [ ] 10.1 在 schedule-producer 中增加“发送文档入库任务”能力(若尚未支持) - [ ] 10.2 定义 embedding-topic,与 producer 约定消息格式(JSON:docId、fullText、metadata) - [ ] 10.3 文档解析完成后,将 fullText 发送到 embedding-topic ### 11. 异常与重试 - [ ] 11.1 Embedding 调用:超时 30s、最多重试 3 次、指数退避 - [ ] 11.2 ES Bulk 部分失败:记录 failedItems,整体不抛异常,返回失败明细 - [ ] 11.3 Kafka 消费异常:记录 failed 消息,支持 nack 或写入死信队列(若需) ### 12. 测试与验证 - [ ] 12.1 TextSplitter 单元测试:多段文本、超长段落、边界情况 - [ ] 12.2 EmbeddingClient 单元测试:Mock HTTP,校验请求与解析 - [ ] 12.3 ElasticsearchRepository 集成测试:需本地或测试 ES - [ ] 12.4 TextEmbeddingService 集成测试:分段 + 向量化 + 入库 全流程 - [ ] 12.5 REST API 接口测试:curl 或 Postman 验证 - [ ] 12.6 Kafka 端到端测试:发送消息 → 消费 → 查 ES 验证入库 - [ ] 12.7 健康检查:/actuator/health 含 ES、Embedding 连通性(可选) --- **参考文档**:`schedule-embedding-api/向量化入库方案.md`