┌─────────────────────────────────────────┐
│ schedule-producer │
│ 发送文档入库任务到 embedding-topic │
└────────────────────┬──────────────────────┘
│
↓
┌─────────────────────────────────────────┐
│ schedule-embedding-api │
│ ┌─────────────────┐ ┌────────────────┐ │
│ │ Kafka Consumer │ │ REST API │ │
│ │ @KafkaListener │ │ POST/GET/... │ │
│ └────────┬────────┘ └───────┬──────────┘ │
│ │ │ │
│ └────────┬─────────┘ │
│ ↓ │
│ TextEmbeddingService (分段→向量化→入库)│
└───────────────────┬──────────────────────┘
│
┌─────────────────┼─────────────────┐
↓ ↓ ↓
Embedding 接口 Elasticsearch (可选) 回调通知
schedule-embedding-api 模块,继承父 pomScheduleEmbeddingApplication 启动类application.yml 基础结构(端口、应用名)ElasticsearchConfig,创建 RestHighLevelClient BeanEmbeddingConfig,读取 Embedding 相关配置ChunkConfig,读取分段参数KafkaConsumerConfig,配置反序列化、ack 模式Chunk 实体:docId、chunkId、chunkIndex、content、contentLength、metadataTextDocument 实体:与 ES 文档结构对应IndexRequest DTO:docId、fileName、fullText、metadataBatchIndexRequest DTO:List<IndexRequest>SearchRequest DTO:query、topK、filtersSearchResult DTO:chunkId、content、score、metadataIndexResult VO:success、docId、chunkCount、messageBatchIndexResult VO:totalCount、successCount、failedCount、failedItemsEmbeddingTaskMessage):docId、fullText、metadata、回调 URL 等TextSplitter 组件,注入分段配置preprocess:统一换行、合并空格、trimsplitByParagraph:按双换行切分段落split:按段落优先,超长再按 maxLength 切分,支持 overlapcreateChunk:生成 chunkId、chunkIndex,填充 metadataTextSplitterTest 单元测试EmbeddingClient 组件,注入 apiUrl、apiKey、modelgetEmbedding(String text):单文本向量化,返回 4096 维 float[]getBatchEmbeddings(List<String> texts):批量向量化(≤128 条)EmbeddingException 自定义异常ElasticsearchRepository,注入 RestHighLevelClient 与 indexNamecreateIndex:创建 contract_chunks 索引,mapping 含 dense_vector(4096)、cosineindexDocument:单文档写入,使用 chunkId 作为 _id(幂等)bulkIndex:BulkRequest 批量写入knnSearch:KNN 查询,cosine 相似度,返回 topKsearchWithFilters:KNN + bool filter(contractType、partyA 等)findByDocId:按 doc_id 查询,按 chunk_index 排序deleteByDocId:_delete_by_query 按 doc_id 删除updateByDocId:_update_by_query 批量更新 metadataElasticsearchException 自定义异常TextEmbeddingService,注入 EmbeddingClient、ElasticsearchRepository、TextSplitterindexDocument:全量文本 → 分段 → 批量向量化 → 批量入库batchIndexDocuments:多文档入库,按 128 条分批向量化search:query 向量化 → KNN 搜索 → 映射为 SearchResulthybridSearch:query 向量化 + 业务过滤 → 混合查询DocumentService,注入 ElasticsearchRepositorygetDocument:按 docId 查所有 chunksdeleteDocument:按 docId 删除所有 chunksupdateDocument:按 docId 更新 metadata(_update_by_query)TextEmbeddingController,基础路径 /api/v1POST /documents/index:单个文档入库,请求体 IndexRequest,返回 IndexResultPOST /documents/batch-index:批量入库,请求体 BatchIndexRequest,返回 BatchIndexResultVectorSearchControllerPOST /search:向量搜索,请求体 SearchRequest,返回 List<SearchResult>POST /search/hybrid:混合搜索(向量 + 业务过滤)DocumentControllerGET /documents/{docId}:查询文档 chunks,支持 sortBy、order 参数DELETE /documents/{docId}:删除文档所有 chunksPUT /documents/{docId}:更新文档 metadata/actuator/healthEmbeddingTaskMessage:解析 Kafka 消息体(docId、fullText、metadata、callbackUrl 等)EmbeddingTaskListener,使用 @KafkaListener 监听 embedding-topickafka.topics.embedding 为 embedding 专用 Topic 名称参考文档:schedule-embedding-api/向量化入库方案.md