非结构解析四级调度模块

zsh 70db66081b 初始化项目,搭建springboot连接kafka基础代码 há 1 mês atrás
.idea 70db66081b 初始化项目,搭建springboot连接kafka基础代码 há 1 mês atrás
schedule-consumer 70db66081b 初始化项目,搭建springboot连接kafka基础代码 há 1 mês atrás
schedule-producer 70db66081b 初始化项目,搭建springboot连接kafka基础代码 há 1 mês atrás
.gitignore 70db66081b 初始化项目,搭建springboot连接kafka基础代码 há 1 mês atrás
README.md 70db66081b 初始化项目,搭建springboot连接kafka基础代码 há 1 mês atrás
pom.xml 70db66081b 初始化项目,搭建springboot连接kafka基础代码 há 1 mês atrás

README.md

四级调度系统 (Four-Level Schedule System)

基于Spring Boot和Kafka的分布式消息调度系统,实现了生产者-消费者模式的消息传递与处理。

项目概述

本项目是一个基于Kafka消息队列的分布式系统,包含两个核心模块:

  • schedule-producer: 消息生产者模块,负责生成和发送消息到Kafka主题
  • schedule-consumer: 消息消费者模块,负责监听和处理Kafka主题中的消息

系统支持多主题消息传递、消息重试机制、死信队列处理等功能,适用于需要高可靠性和可扩展性的消息处理场景。

技术栈

  • Java: 1.8
  • Spring Boot: 2.7.18
  • Spring Kafka: 2.8.11
  • Kafka: Apache Kafka
  • 构建工具: Maven
  • 其他依赖:
    • FastJSON 1.2.83
    • Lombok 1.18.30

项目结构

four-level-schedule/
├── pom.xml                          # 父级POM文件
├── schedule-producer/               # 生产者模块
│   ├── pom.xml
│   └── src/main/
│       ├── java/cn/com/yusys/producer/
│       │   ├── ProducerApplication.java
│       │   ├── config/
│       │   │   └── KafkaProducerConfig.java
│       │   ├── controller/
│       │   │   └── ProducerController.java
│       │   └── service/
│       │       └── MessageSender.java
│       └── resources/
│           └── application.yml
└── schedule-consumer/               # 消费者模块
    ├── pom.xml
    └── src/main/
        ├── java/cn/com/yusys/consumer/
        │   ├── ConsumerApplication.java
        │   ├── config/
        │   │   └── KafkaConsumerConfig.java
        │   └── listener/
        │       └── MessageListener.java
        └── resources/
            └── application.yml

模块说明

1. schedule-producer (生产者模块)

生产者模块提供RESTful API接口,用于向Kafka主题发送消息。

主要功能:

  • 支持向指定主题发送单条消息
  • 支持批量向多个主题发送消息
  • 消息发送回调处理(成功/失败日志记录)
  • 消息键值对支持

API接口:

  • GET /api/send: 向指定主题发送单条消息

    • 参数: topic(主题名), msg(消息内容), key(消息键,可选)
    • 返回: 发送状态和消息信息
  • GET /api/send-batch: 批量向多个主题发送消息

    • 参数: msg(消息内容,默认为"batch-msg")
    • 返回: 批量发送状态

配置说明:

  • 服务端口: 8081
  • Kafka服务器: localhost:9092
  • 生产者配置: acks=all, retries=3

2. schedule-consumer (消费者模块)

消费者模块监听Kafka主题并处理消息,具备错误处理和重试机制。

主要功能:

  • 多主题监听(yusp-topic-A, yusp-topic-B, yusp-topic-C)
  • 手动提交偏移量
  • 消息重试机制(最多重试3次)
  • 死信队列处理(重试失败后发送到死信主题)
  • 业务异常模拟(当消息内容为"error"时抛出异常)

配置说明:

  • 服务端口: 8082
  • Kafka服务器: localhost:9092
  • 消费者组: yusp-multi-topic-group
  • 自动提交: false(手动提交)
  • 偏移重置策略: earliest
  • 并发消费数: 3

快速开始

前置条件

  1. 安装JDK 1.8或更高版本
  2. 安装Maven 3.6或更高版本
  3. 安装并启动Apache Kafka(默认配置: localhost:9092)

使用示例

  1. 发送单条消息:

    curl "http://localhost:8081/api/send?topic=yusp-topic-A&msg=Hello%20World&key=test-key"
    
  2. 批量发送消息:

    curl "http://localhost:8081/api/send-batch?msg=Batch%20Message"
    
  3. 测试错误处理(会触发重试和死信队列):

    curl "http://localhost:8081/api/send?topic=yusp-topic-A&msg=error"
    

配置说明

生产者配置 (schedule-producer/src/main/resources/application.yml)

server:
  port: 8081

spring:
  application:
    name: schedule-producer
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      acks: all
      retries: 3

消费者配置 (schedule-consumer/src/main/resources/application.yml)

server:
  port: 8082

spring:
  application:
    name: yusp-kafka-consumer
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: yusp-multi-topic-group
      enable-auto-commit: false
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

kafka:
  topics:
    listen: yusp-topic-A,yusp-topic-B,yusp-topic-C

核心功能说明

消息重试机制

消费者模块实现了消息重试机制,当消息处理失败时:

  1. 自动重试最多3次,每次间隔1秒
  2. 重试失败后,消息被发送到死信主题(原主题名后加.DLT)
  3. 死信主题中的消息需要人工干预或特殊处理

手动提交偏移量

消费者采用手动提交模式,确保消息处理成功后才提交偏移量,避免消息丢失。

多主题监听

消费者可以同时监听多个主题,根据配置文件中的kafka.topics.listen参数动态配置。

构建与部署

构建可执行JAR

# 构建生产者JAR
cd schedule-producer
mvn clean package

# 构建消费者JAR
cd schedule-consumer
mvn clean package

生成的JAR文件位于:

  • 生产者: schedule-producer/target/schedule-producer.jar
  • 消费者: schedule-consumer/target/schedule-consumer.jar

注意事项

  1. 确保Kafka服务已正确启动并可访问
  2. 生产环境和测试环境应使用不同的Kafka集群配置
  3. 根据实际需求调整重试次数和间隔时间
  4. 监控死信主题,及时处理失败消息
  5. 在生产环境中,建议配置Kafka的安全认证机制

故障排查

消费者无法连接Kafka

  • 检查Kafka服务是否启动
  • 确认bootstrap-servers配置正确
  • 检查网络连接和防火墙设置

消息处理失败

  • 查看消费者日志,确认错误原因
  • 检查死信主题中的失败消息
  • 调整业务逻辑或增加重试次数

消息丢失

  • 确认消费者配置中enable-auto-commit为false
  • 确保生产者配置中acks为all
  • 检查Kafka的副本因子配置

许可证

本项目采用内部许可证,仅供学习和研究使用。

联系方式

如有问题或建议,请联系项目维护团队。