|
|
hace 1 mes | |
|---|---|---|
| .idea | hace 1 mes | |
| schedule-consumer | hace 1 mes | |
| schedule-producer | hace 1 mes | |
| .gitignore | hace 1 mes | |
| README.md | hace 1 mes | |
| pom.xml | hace 1 mes |
基于Spring Boot和Kafka的分布式消息调度系统,实现了生产者-消费者模式的消息传递与处理。
本项目是一个基于Kafka消息队列的分布式系统,包含两个核心模块:
系统支持多主题消息传递、消息重试机制、死信队列处理等功能,适用于需要高可靠性和可扩展性的消息处理场景。
four-level-schedule/
├── pom.xml # 父级POM文件
├── schedule-producer/ # 生产者模块
│ ├── pom.xml
│ └── src/main/
│ ├── java/cn/com/yusys/producer/
│ │ ├── ProducerApplication.java
│ │ ├── config/
│ │ │ └── KafkaProducerConfig.java
│ │ ├── controller/
│ │ │ └── ProducerController.java
│ │ └── service/
│ │ └── MessageSender.java
│ └── resources/
│ └── application.yml
└── schedule-consumer/ # 消费者模块
├── pom.xml
└── src/main/
├── java/cn/com/yusys/consumer/
│ ├── ConsumerApplication.java
│ ├── config/
│ │ └── KafkaConsumerConfig.java
│ └── listener/
│ └── MessageListener.java
└── resources/
└── application.yml
生产者模块提供RESTful API接口,用于向Kafka主题发送消息。
主要功能:
API接口:
GET /api/send: 向指定主题发送单条消息
GET /api/send-batch: 批量向多个主题发送消息
配置说明:
消费者模块监听Kafka主题并处理消息,具备错误处理和重试机制。
主要功能:
配置说明:
发送单条消息:
curl "http://localhost:8081/api/send?topic=yusp-topic-A&msg=Hello%20World&key=test-key"
批量发送消息:
curl "http://localhost:8081/api/send-batch?msg=Batch%20Message"
测试错误处理(会触发重试和死信队列):
curl "http://localhost:8081/api/send?topic=yusp-topic-A&msg=error"
server:
port: 8081
spring:
application:
name: schedule-producer
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: all
retries: 3
server:
port: 8082
spring:
application:
name: yusp-kafka-consumer
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: yusp-multi-topic-group
enable-auto-commit: false
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
kafka:
topics:
listen: yusp-topic-A,yusp-topic-B,yusp-topic-C
消费者模块实现了消息重试机制,当消息处理失败时:
消费者采用手动提交模式,确保消息处理成功后才提交偏移量,避免消息丢失。
消费者可以同时监听多个主题,根据配置文件中的kafka.topics.listen参数动态配置。
# 构建生产者JAR
cd schedule-producer
mvn clean package
# 构建消费者JAR
cd schedule-consumer
mvn clean package
生成的JAR文件位于:
schedule-producer/target/schedule-producer.jarschedule-consumer/target/schedule-consumer.jarenable-auto-commit为falseacks为all本项目采用内部许可证,仅供学习和研究使用。
如有问题或建议,请联系项目维护团队。