# 四级调度系统 (Four-Level Schedule System) 基于Spring Boot和Kafka的分布式消息调度系统,实现了生产者-消费者模式的消息传递与处理。 ## 项目概述 本项目是一个基于Kafka消息队列的分布式系统,包含两个核心模块: - **schedule-producer**: 消息生产者模块,负责生成和发送消息到Kafka主题 - **schedule-consumer**: 消息消费者模块,负责监听和处理Kafka主题中的消息 系统支持多主题消息传递、消息重试机制、死信队列处理等功能,适用于需要高可靠性和可扩展性的消息处理场景。 ## 技术栈 - **Java**: 1.8 - **Spring Boot**: 2.7.18 - **Spring Kafka**: 2.8.11 - **Kafka**: Apache Kafka - **构建工具**: Maven - **其他依赖**: - FastJSON 1.2.83 - Lombok 1.18.30 ## 项目结构 ``` four-level-schedule/ ├── pom.xml # 父级POM文件 ├── schedule-producer/ # 生产者模块 │ ├── pom.xml │ └── src/main/ │ ├── java/cn/com/yusys/producer/ │ │ ├── ProducerApplication.java │ │ ├── config/ │ │ │ └── KafkaProducerConfig.java │ │ ├── controller/ │ │ │ └── ProducerController.java │ │ └── service/ │ │ └── MessageSender.java │ └── resources/ │ └── application.yml └── schedule-consumer/ # 消费者模块 ├── pom.xml └── src/main/ ├── java/cn/com/yusys/consumer/ │ ├── ConsumerApplication.java │ ├── config/ │ │ └── KafkaConsumerConfig.java │ └── listener/ │ └── MessageListener.java └── resources/ └── application.yml ``` ## 模块说明 ### 1. schedule-producer (生产者模块) 生产者模块提供RESTful API接口,用于向Kafka主题发送消息。 **主要功能**: - 支持向指定主题发送单条消息 - 支持批量向多个主题发送消息 - 消息发送回调处理(成功/失败日志记录) - 消息键值对支持 **API接口**: - `GET /api/send`: 向指定主题发送单条消息 - 参数: topic(主题名), msg(消息内容), key(消息键,可选) - 返回: 发送状态和消息信息 - `GET /api/send-batch`: 批量向多个主题发送消息 - 参数: msg(消息内容,默认为"batch-msg") - 返回: 批量发送状态 **配置说明**: - 服务端口: 8081 - Kafka服务器: localhost:9092 - 生产者配置: acks=all, retries=3 ### 2. schedule-consumer (消费者模块) 消费者模块监听Kafka主题并处理消息,具备错误处理和重试机制。 **主要功能**: - 多主题监听(yusp-topic-A, yusp-topic-B, yusp-topic-C) - 手动提交偏移量 - 消息重试机制(最多重试3次) - 死信队列处理(重试失败后发送到死信主题) - 业务异常模拟(当消息内容为"error"时抛出异常) **配置说明**: - 服务端口: 8082 - Kafka服务器: localhost:9092 - 消费者组: yusp-multi-topic-group - 自动提交: false(手动提交) - 偏移重置策略: earliest - 并发消费数: 3 ## 快速开始 ### 前置条件 1. 安装JDK 1.8或更高版本 2. 安装Maven 3.6或更高版本 3. 安装并启动Apache Kafka(默认配置: localhost:9092) ### 使用示例 1. 发送单条消息: ```bash curl "http://localhost:8081/api/send?topic=yusp-topic-A&msg=Hello%20World&key=test-key" ``` 2. 批量发送消息: ```bash curl "http://localhost:8081/api/send-batch?msg=Batch%20Message" ``` 3. 测试错误处理(会触发重试和死信队列): ```bash curl "http://localhost:8081/api/send?topic=yusp-topic-A&msg=error" ``` ## 配置说明 ### 生产者配置 (schedule-producer/src/main/resources/application.yml) ```yaml server: port: 8081 spring: application: name: schedule-producer kafka: bootstrap-servers: localhost:9092 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer acks: all retries: 3 ``` ### 消费者配置 (schedule-consumer/src/main/resources/application.yml) ```yaml server: port: 8082 spring: application: name: yusp-kafka-consumer kafka: bootstrap-servers: localhost:9092 consumer: group-id: yusp-multi-topic-group enable-auto-commit: false auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer kafka: topics: listen: yusp-topic-A,yusp-topic-B,yusp-topic-C ``` ## 核心功能说明 ### 消息重试机制 消费者模块实现了消息重试机制,当消息处理失败时: 1. 自动重试最多3次,每次间隔1秒 2. 重试失败后,消息被发送到死信主题(原主题名后加.DLT) 3. 死信主题中的消息需要人工干预或特殊处理 ### 手动提交偏移量 消费者采用手动提交模式,确保消息处理成功后才提交偏移量,避免消息丢失。 ### 多主题监听 消费者可以同时监听多个主题,根据配置文件中的`kafka.topics.listen`参数动态配置。 ## 构建与部署 ### 构建可执行JAR ```bash # 构建生产者JAR cd schedule-producer mvn clean package # 构建消费者JAR cd schedule-consumer mvn clean package ``` 生成的JAR文件位于: - 生产者: `schedule-producer/target/schedule-producer.jar` - 消费者: `schedule-consumer/target/schedule-consumer.jar` ## 注意事项 1. 确保Kafka服务已正确启动并可访问 2. 生产环境和测试环境应使用不同的Kafka集群配置 3. 根据实际需求调整重试次数和间隔时间 4. 监控死信主题,及时处理失败消息 5. 在生产环境中,建议配置Kafka的安全认证机制 ## 故障排查 ### 消费者无法连接Kafka - 检查Kafka服务是否启动 - 确认bootstrap-servers配置正确 - 检查网络连接和防火墙设置 ### 消息处理失败 - 查看消费者日志,确认错误原因 - 检查死信主题中的失败消息 - 调整业务逻辑或增加重试次数 ### 消息丢失 - 确认消费者配置中`enable-auto-commit`为false - 确保生产者配置中`acks`为all - 检查Kafka的副本因子配置 ## 许可证 本项目采用内部许可证,仅供学习和研究使用。 ## 联系方式 如有问题或建议,请联系项目维护团队。