01-flink-optimization.md 2.2 KB

Flink 优化任务

当前状态

  • ✅ RealFlinkJob 基础版本已完成
  • ✅ 纯 MapFunction 实现
  • ❌ 使用 Async I/O(当前串行调用)
  • ❌ 从 Kafka 读取任务(当前硬编码测试数据)
  • ❌ 状态管理和容错

待优化项

1. 改用 Async I/O 调用 parse-service

优先级:高

原因

  • 当前 MapFunction 是串行的,效率低
  • Async I/O 可以并发调用多个 parse-service 实例
  • 支持超时和重试

实现要点

// 替换 .map() 为 AsyncDataStream
AsyncDataStream.unorderedWait(
    taskStream,
    new ParseAsyncFunction(parseServiceUrl),
    60, TimeUnit.SECONDS,   // 超时时间
    100                      // 最大并发请求数
)

参考文件

  • RealFlinkJob.java
  • 新建 ParseAsyncFunction.java

2. 从 Kafka 读取任务(替代硬编码测试数据)

优先级:高

原因

  • 真实生产环境是从 Kafka 消费任务
  • 当前是 env.fromElements(...) 硬编码

实现要点

KafkaSource<ParseTask> source = KafkaSource.<ParseTask>builder()
    .setBootstrapServers("kafka:9092")
    .setTopics("parse-task-topic")
    .setGroupId("flink-parse-group")
    .setValueOnlyDeserializer(new ParseTaskDeserializer())
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")

参考文件

  • schedule-producer 看消息格式
  • schedule-consumer 看消费逻辑

3. 实现容错和状态管理

优先级:中

内容

  • Exactly-Once 语义配置
  • Checkpoint 配置(生产环境)
  • 失败重试策略
  • 死信队列(处理失败的任务)

4. KeyBy 按文件类型分组(优化局部性)

优先级:低

原因

  • 相同类型的文件可能在同一个 parse-service 实例缓存更好
  • 可以针对不同文件类型调优不同的并发度

优化后的完整流程

Kafka Source
    ↓
KeyBy(文件类型)
    ↓
Async I/O (调用 parse-service 池)
    ↓
Map (调用 embedding-api)
    ↓
Sink (ES / Kafka / 数据库)

相关文件

  • schedule-flink/src/main/java/com/yusys/flink/RealFlinkJob.java
  • schedule-flink/src/main/java/com/yusys/flink/PureJavaTest.java