# Flink 优化任务 ## 当前状态 - ✅ RealFlinkJob 基础版本已完成 - ✅ 纯 MapFunction 实现 - ❌ 使用 Async I/O(当前串行调用) - ❌ 从 Kafka 读取任务(当前硬编码测试数据) - ❌ 状态管理和容错 --- ## 待优化项 ### 1. 改用 Async I/O 调用 parse-service **优先级:高** **原因**: - 当前 MapFunction 是串行的,效率低 - Async I/O 可以并发调用多个 parse-service 实例 - 支持超时和重试 **实现要点**: ```java // 替换 .map() 为 AsyncDataStream AsyncDataStream.unorderedWait( taskStream, new ParseAsyncFunction(parseServiceUrl), 60, TimeUnit.SECONDS, // 超时时间 100 // 最大并发请求数 ) ``` **参考文件**: - `RealFlinkJob.java` - 新建 `ParseAsyncFunction.java` --- ### 2. 从 Kafka 读取任务(替代硬编码测试数据) **优先级:高** **原因**: - 真实生产环境是从 Kafka 消费任务 - 当前是 `env.fromElements(...)` 硬编码 **实现要点**: ```java KafkaSource source = KafkaSource.builder() .setBootstrapServers("kafka:9092") .setTopics("parse-task-topic") .setGroupId("flink-parse-group") .setValueOnlyDeserializer(new ParseTaskDeserializer()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source") ``` **参考文件**: - `schedule-producer` 看消息格式 - `schedule-consumer` 看消费逻辑 --- ### 3. 实现容错和状态管理 **优先级:中** **内容**: - Exactly-Once 语义配置 - Checkpoint 配置(生产环境) - 失败重试策略 - 死信队列(处理失败的任务) --- ### 4. KeyBy 按文件类型分组(优化局部性) **优先级:低** **原因**: - 相同类型的文件可能在同一个 parse-service 实例缓存更好 - 可以针对不同文件类型调优不同的并发度 --- ## 优化后的完整流程 ``` Kafka Source ↓ KeyBy(文件类型) ↓ Async I/O (调用 parse-service 池) ↓ Map (调用 embedding-api) ↓ Sink (ES / Kafka / 数据库) ``` --- ## 相关文件 - `schedule-flink/src/main/java/com/yusys/flink/RealFlinkJob.java` - `schedule-flink/src/main/java/com/yusys/flink/PureJavaTest.java`