Преглед изворни кода

初始化项目,搭建springboot连接kafka基础代码

zsh пре 4 недеља
комит
70db66081b

+ 39 - 0
.gitignore

@@ -0,0 +1,39 @@
+target/
+!.mvn/wrapper/maven-wrapper.jar
+!**/src/main/**/target/
+!**/src/test/**/target/
+.kotlin
+
+### IntelliJ IDEA ###
+.idea/modules.xml
+.idea/jarRepositories.xml
+.idea/compiler.xml
+.idea/libraries/
+*.iws
+*.iml
+*.ipr
+
+### Eclipse ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+build/
+!**/src/main/**/build/
+!**/src/test/**/build/
+
+### VS Code ###
+.vscode/
+
+### Mac OS ###
+.DS_Store

+ 10 - 0
.idea/.gitignore

@@ -0,0 +1,10 @@
+# 默认忽略的文件
+/shelf/
+/workspace.xml
+# 已忽略包含查询文件的默认文件夹
+/queries/
+# Datasource local storage ignored files
+/dataSources/
+/dataSources.local.xml
+# 基于编辑器的 HTTP 客户端请求
+/httpRequests/

+ 11 - 0
.idea/encodings.xml

@@ -0,0 +1,11 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="Encoding">
+    <file url="file://$PROJECT_DIR$/schedule-consumer/src/main/java" charset="UTF-8" />
+    <file url="file://$PROJECT_DIR$/schedule-consumer/src/main/resources" charset="UTF-8" />
+    <file url="file://$PROJECT_DIR$/schedule-producer/src/main/java" charset="UTF-8" />
+    <file url="file://$PROJECT_DIR$/schedule-producer/src/main/resources" charset="UTF-8" />
+    <file url="file://$PROJECT_DIR$/src/main/java" charset="UTF-8" />
+    <file url="file://$PROJECT_DIR$/src/main/resources" charset="UTF-8" />
+  </component>
+</project>

+ 14 - 0
.idea/misc.xml

@@ -0,0 +1,14 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="ExternalStorageConfigurationManager" enabled="true" />
+  <component name="MavenProjectsManager">
+    <option name="originalFiles">
+      <list>
+        <option value="$PROJECT_DIR$/pom.xml" />
+      </list>
+    </option>
+  </component>
+  <component name="ProjectRootManager" version="2" project-jdk-name="corretto-1.8" project-jdk-type="JavaSDK">
+    <output url="file://$PROJECT_DIR$/out" />
+  </component>
+</project>

+ 237 - 0
README.md

@@ -0,0 +1,237 @@
+# 四级调度系统 (Four-Level Schedule System)
+
+基于Spring Boot和Kafka的分布式消息调度系统,实现了生产者-消费者模式的消息传递与处理。
+
+## 项目概述
+
+本项目是一个基于Kafka消息队列的分布式系统,包含两个核心模块:
+- **schedule-producer**: 消息生产者模块,负责生成和发送消息到Kafka主题
+- **schedule-consumer**: 消息消费者模块,负责监听和处理Kafka主题中的消息
+
+系统支持多主题消息传递、消息重试机制、死信队列处理等功能,适用于需要高可靠性和可扩展性的消息处理场景。
+
+## 技术栈
+
+- **Java**: 1.8
+- **Spring Boot**: 2.7.18
+- **Spring Kafka**: 2.8.11
+- **Kafka**: Apache Kafka
+- **构建工具**: Maven
+- **其他依赖**:
+  - FastJSON 1.2.83
+  - Lombok 1.18.30
+
+## 项目结构
+
+```
+four-level-schedule/
+├── pom.xml                          # 父级POM文件
+├── schedule-producer/               # 生产者模块
+│   ├── pom.xml
+│   └── src/main/
+│       ├── java/cn/com/yusys/producer/
+│       │   ├── ProducerApplication.java
+│       │   ├── config/
+│       │   │   └── KafkaProducerConfig.java
+│       │   ├── controller/
+│       │   │   └── ProducerController.java
+│       │   └── service/
+│       │       └── MessageSender.java
+│       └── resources/
+│           └── application.yml
+└── schedule-consumer/               # 消费者模块
+    ├── pom.xml
+    └── src/main/
+        ├── java/cn/com/yusys/consumer/
+        │   ├── ConsumerApplication.java
+        │   ├── config/
+        │   │   └── KafkaConsumerConfig.java
+        │   └── listener/
+        │       └── MessageListener.java
+        └── resources/
+            └── application.yml
+```
+
+## 模块说明
+
+### 1. schedule-producer (生产者模块)
+
+生产者模块提供RESTful API接口,用于向Kafka主题发送消息。
+
+**主要功能**:
+- 支持向指定主题发送单条消息
+- 支持批量向多个主题发送消息
+- 消息发送回调处理(成功/失败日志记录)
+- 消息键值对支持
+
+**API接口**:
+- `GET /api/send`: 向指定主题发送单条消息
+  - 参数: topic(主题名), msg(消息内容), key(消息键,可选)
+  - 返回: 发送状态和消息信息
+
+- `GET /api/send-batch`: 批量向多个主题发送消息
+  - 参数: msg(消息内容,默认为"batch-msg")
+  - 返回: 批量发送状态
+
+**配置说明**:
+- 服务端口: 8081
+- Kafka服务器: localhost:9092
+- 生产者配置: acks=all, retries=3
+
+### 2. schedule-consumer (消费者模块)
+
+消费者模块监听Kafka主题并处理消息,具备错误处理和重试机制。
+
+**主要功能**:
+- 多主题监听(yusp-topic-A, yusp-topic-B, yusp-topic-C)
+- 手动提交偏移量
+- 消息重试机制(最多重试3次)
+- 死信队列处理(重试失败后发送到死信主题)
+- 业务异常模拟(当消息内容为"error"时抛出异常)
+
+**配置说明**:
+- 服务端口: 8082
+- Kafka服务器: localhost:9092
+- 消费者组: yusp-multi-topic-group
+- 自动提交: false(手动提交)
+- 偏移重置策略: earliest
+- 并发消费数: 3
+
+## 快速开始
+
+### 前置条件
+
+1. 安装JDK 1.8或更高版本
+2. 安装Maven 3.6或更高版本
+3. 安装并启动Apache Kafka(默认配置: localhost:9092)
+
+
+
+### 使用示例
+
+1. 发送单条消息:
+```bash
+curl "http://localhost:8081/api/send?topic=yusp-topic-A&msg=Hello%20World&key=test-key"
+```
+
+2. 批量发送消息:
+```bash
+curl "http://localhost:8081/api/send-batch?msg=Batch%20Message"
+```
+
+3. 测试错误处理(会触发重试和死信队列):
+```bash
+curl "http://localhost:8081/api/send?topic=yusp-topic-A&msg=error"
+```
+
+## 配置说明
+
+### 生产者配置 (schedule-producer/src/main/resources/application.yml)
+
+```yaml
+server:
+  port: 8081
+
+spring:
+  application:
+    name: schedule-producer
+  kafka:
+    bootstrap-servers: localhost:9092
+    producer:
+      key-serializer: org.apache.kafka.common.serialization.StringSerializer
+      value-serializer: org.apache.kafka.common.serialization.StringSerializer
+      acks: all
+      retries: 3
+```
+
+### 消费者配置 (schedule-consumer/src/main/resources/application.yml)
+
+```yaml
+server:
+  port: 8082
+
+spring:
+  application:
+    name: yusp-kafka-consumer
+  kafka:
+    bootstrap-servers: localhost:9092
+    consumer:
+      group-id: yusp-multi-topic-group
+      enable-auto-commit: false
+      auto-offset-reset: earliest
+      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+
+kafka:
+  topics:
+    listen: yusp-topic-A,yusp-topic-B,yusp-topic-C
+```
+
+## 核心功能说明
+
+### 消息重试机制
+
+消费者模块实现了消息重试机制,当消息处理失败时:
+1. 自动重试最多3次,每次间隔1秒
+2. 重试失败后,消息被发送到死信主题(原主题名后加.DLT)
+3. 死信主题中的消息需要人工干预或特殊处理
+
+### 手动提交偏移量
+
+消费者采用手动提交模式,确保消息处理成功后才提交偏移量,避免消息丢失。
+
+### 多主题监听
+
+消费者可以同时监听多个主题,根据配置文件中的`kafka.topics.listen`参数动态配置。
+
+## 构建与部署
+
+### 构建可执行JAR
+
+```bash
+# 构建生产者JAR
+cd schedule-producer
+mvn clean package
+
+# 构建消费者JAR
+cd schedule-consumer
+mvn clean package
+```
+
+生成的JAR文件位于:
+- 生产者: `schedule-producer/target/schedule-producer.jar`
+- 消费者: `schedule-consumer/target/schedule-consumer.jar`
+
+
+## 注意事项
+
+1. 确保Kafka服务已正确启动并可访问
+2. 生产环境和测试环境应使用不同的Kafka集群配置
+3. 根据实际需求调整重试次数和间隔时间
+4. 监控死信主题,及时处理失败消息
+5. 在生产环境中,建议配置Kafka的安全认证机制
+
+## 故障排查
+
+### 消费者无法连接Kafka
+- 检查Kafka服务是否启动
+- 确认bootstrap-servers配置正确
+- 检查网络连接和防火墙设置
+
+### 消息处理失败
+- 查看消费者日志,确认错误原因
+- 检查死信主题中的失败消息
+- 调整业务逻辑或增加重试次数
+
+### 消息丢失
+- 确认消费者配置中`enable-auto-commit`为false
+- 确保生产者配置中`acks`为all
+- 检查Kafka的副本因子配置
+
+## 许可证
+
+本项目采用内部许可证,仅供学习和研究使用。
+
+## 联系方式
+
+如有问题或建议,请联系项目维护团队。

+ 79 - 0
pom.xml

@@ -0,0 +1,79 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.example</groupId>
+    <artifactId>four-level-schedule</artifactId>
+    <version>1.0-SNAPSHOT</version>
+    <packaging>pom</packaging>
+    <modules>
+        <module>schedule-producer</module>
+        <module>schedule-consumer</module>
+    </modules>
+
+    <!-- 统一属性管理 -->
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <java.version>1.8</java.version>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <!-- 匹配 Spring Framework 5.3.33 的最新 Boot 版本 -->
+        <spring-boot.version>2.7.18</spring-boot.version>
+        <!-- 对应的 Spring Kafka 版本 -->
+        <spring-kafka.version>2.8.11</spring-kafka.version>
+        <fastjson.version>1.2.83</fastjson.version>
+        <lombok.version>1.18.30</lombok.version>
+    </properties>
+
+    <!-- 依赖管理:子模块引用时无需写版本号 -->
+    <dependencyManagement>
+        <dependencies>
+            <!-- Spring Boot Dependencies -->
+            <dependency>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-dependencies</artifactId>
+                <version>${spring-boot.version}</version>
+                <type>pom</type>
+                <scope>import</scope>
+            </dependency>
+
+            <!-- Spring Kafka (显式指定版本以匹配 Boot 2.7.x) -->
+            <dependency>
+                <groupId>org.springframework.kafka</groupId>
+                <artifactId>spring-kafka</artifactId>
+                <version>${spring-kafka.version}</version>
+            </dependency>
+
+            <!-- 其他通用依赖 -->
+            <dependency>
+                <groupId>com.alibaba</groupId>
+                <artifactId>fastjson</artifactId>
+                <version>${fastjson.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.projectlombok</groupId>
+                <artifactId>lombok</artifactId>
+                <version>${lombok.version}</version>
+                <optional>true</optional>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>${java.version}</source>
+                    <target>${java.version}</target>
+                    <encoding>${project.build.sourceEncoding}</encoding>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>
+

+ 58 - 0
schedule-consumer/pom.xml

@@ -0,0 +1,58 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.example</groupId>
+        <artifactId>four-level-schedule</artifactId>
+        <version>1.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>schedule-consumer</artifactId>
+    <packaging>jar</packaging>
+    <name>Schedule Consumer</name>
+
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+    <dependencies>
+        <!-- Web 依赖保留用于 Actuator 或健康检查,虽主要逻辑是监听 -->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <finalName>schedule-consumer</finalName>
+        <plugins>
+            <plugin>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>repackage</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

+ 11 - 0
schedule-consumer/src/main/java/cn/com/yusys/consumer/ConsumerApplication.java

@@ -0,0 +1,11 @@
+package cn.com.yusys.consumer;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class ConsumerApplication {
+    public static void main(String[] args) {
+        SpringApplication.run(ConsumerApplication.class, args);
+    }
+}

+ 98 - 0
schedule-consumer/src/main/java/cn/com/yusys/consumer/config/KafkaConsumerConfig.java

@@ -0,0 +1,98 @@
+package cn.com.yusys.consumer.config;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.core.*;
+import org.springframework.kafka.listener.*;
+import org.springframework.util.backoff.FixedBackOff;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Configuration
+public class KafkaConsumerConfig {
+
+    private static final Logger log = LoggerFactory.getLogger(KafkaConsumerConfig.class);
+
+    @Value("${spring.kafka.bootstrap-servers}")
+    private String bootstrapServers;
+
+    @Value("${spring.kafka.consumer.group-id}")
+    private String groupId;
+
+    // 1. 死信生产者
+    @Bean
+    public ProducerFactory<String, String> dltProducerFactory() {
+        Map<String, Object> props = new HashMap<>();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(ProducerConfig.ACKS_CONFIG, "all");
+        return new DefaultKafkaProducerFactory<>(props);
+    }
+
+    @Bean
+    public KafkaTemplate<String, String> dltKafkaTemplate() {
+        return new KafkaTemplate<>(dltProducerFactory());
+    }
+
+    // 2. 死信恢复器
+    @Bean
+    public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer() {
+        return new DeadLetterPublishingRecoverer(dltKafkaTemplate());
+    }
+
+    // 3. 错误处理器 (重试 3 次 -> 死信)
+    @Bean
+    public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer recoverer) {
+        long interval = 1000L;
+        long maxAttempts = 3L;
+
+        // 构造函数传入恢复器和退避策略
+        DefaultErrorHandler handler = new DefaultErrorHandler(recoverer, new FixedBackOff(interval, maxAttempts));
+
+
+        return handler;
+    }
+
+    // 4. 消费者工厂
+    @Bean
+    public ConsumerFactory<String, String> consumerFactory() {
+        Map<String, Object> props = new HashMap<>();
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 必须 false
+
+        return new DefaultKafkaConsumerFactory<>(props);
+    }
+
+    // 5. 监听器工厂 (整合 Ack 模式和 ErrorHandler)
+    @Bean
+    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
+            DefaultErrorHandler errorHandler) {
+
+        ConcurrentKafkaListenerContainerFactory<String, String> factory =
+                new ConcurrentKafkaListenerContainerFactory<>();
+        factory.setConsumerFactory(consumerFactory());
+        factory.setConcurrency(3);
+
+        // 【核心】注入错误处理器
+        factory.setCommonErrorHandler(errorHandler);
+
+        // 【核心】设置手动提交模式
+        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
+
+        return factory;
+    }
+}

+ 44 - 0
schedule-consumer/src/main/java/cn/com/yusys/consumer/listener/MessageListener.java

@@ -0,0 +1,44 @@
+package cn.com.yusys.consumer.listener;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.support.Acknowledgment;
+import org.springframework.stereotype.Component;
+
+@Component
+public class MessageListener {
+    private static final Logger log = LoggerFactory.getLogger(MessageListener.class);
+
+    // 从配置读取逗号分隔的 Topic 字符串
+    @Value("${kafka.topics.listen}")
+    private String topicsConfig;
+
+    /**
+     * 监听多个 Topic
+     * 注意:同一个 groupId 下的不同消费者实例会共同负载均衡消费这些 Topic 的所有分区。
+     */
+    @KafkaListener(topics = "${kafka.topics.listen}", groupId = "${spring.kafka.consumer.group-id}")
+    public void listen(String message, ConsumerRecord<?, ?> record, Acknowledgment acknowledgment) {
+        // 打印当前消息来自哪个 Topic,方便区分
+        log.info("=== [CONSUMER] 收到消息 | Topic: {} | Key: {} | Msg: {} | Offset: {} ===",
+                record.topic(), record.key(), message, record.offset());
+
+        processBusinessLogic(message);
+        acknowledgment.acknowledge();
+
+    }
+
+    private void processBusinessLogic(String message) {
+        if ("error".equals(message)) {
+            throw new RuntimeException("Simulated Business Exception");
+        }
+        try {
+            Thread.sleep(500);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
+}

+ 19 - 0
schedule-consumer/src/main/resources/application.yml

@@ -0,0 +1,19 @@
+server:
+  port: 8082
+
+spring:
+  application:
+    name: yusp-kafka-consumer
+  kafka:
+    bootstrap-servers: 10.192.72.13:9092
+    consumer:
+      group-id: yusp-topic-group
+      enable-auto-commit: false
+      auto-offset-reset: earliest
+      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+
+# 可选:在配置中定义 Topic 列表,方便管理
+kafka:
+  topics:
+    listen: test-topic

+ 59 - 0
schedule-producer/pom.xml

@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.example</groupId>
+        <artifactId>four-level-schedule</artifactId>
+        <version>1.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>schedule-producer</artifactId>
+    <packaging>jar</packaging>
+    <name>Schedule Producer</name>
+
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <finalName>schedule-producer</finalName>
+        <plugins>
+            <plugin>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>repackage</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>

+ 11 - 0
schedule-producer/src/main/java/cn/com/yusys/producer/ProducerApplication.java

@@ -0,0 +1,11 @@
+package cn.com.yusys.producer;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class ProducerApplication {
+    public static void main(String[] args) {
+        SpringApplication.run(ProducerApplication.class, args);
+    }
+}

+ 37 - 0
schedule-producer/src/main/java/cn/com/yusys/producer/config/KafkaProducerConfig.java

@@ -0,0 +1,37 @@
+package cn.com.yusys.producer.config;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Configuration
+public class KafkaProducerConfig {
+
+    @Value("${spring.kafka.bootstrap-servers}")
+    private String bootstrapServers;
+
+    @Bean
+    public ProducerFactory<String, String> producerFactory() {
+        Map<String, Object> configProps = new HashMap<>();
+        System.out.println(bootstrapServers);
+        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        configProps.put(ProducerConfig.ACKS_CONFIG, "all");
+        configProps.put(ProducerConfig.RETRIES_CONFIG, 3);
+        return new DefaultKafkaProducerFactory<>(configProps);
+    }
+
+    @Bean
+    public KafkaTemplate<String, String> kafkaTemplate() {
+        return new KafkaTemplate<>(producerFactory());
+    }
+}

+ 58 - 0
schedule-producer/src/main/java/cn/com/yusys/producer/controller/ProducerController.java

@@ -0,0 +1,58 @@
+package cn.com.yusys.producer.controller;
+
+import cn.com.yusys.producer.model.Message;
+import cn.com.yusys.producer.service.MessageSender;
+import org.springframework.web.bind.annotation.*;
+
+import javax.annotation.Resource;
+import java.util.HashMap;
+import java.util.Map;
+
+@RestController
+@RequestMapping("/api")
+public class ProducerController {
+
+    @Resource
+    private MessageSender messageSender;
+
+    /**
+     * 通用发送接口
+     */
+    @PostMapping("/send")
+    public Map<String, Object> sendMsg(@RequestBody Message message) {
+
+        String topic = message.getTopic();
+        String key = message.getKey();
+        String msg = message.getMessage();
+
+        // 简单校验 Topic 名称
+        if (topic == null || topic.trim().isEmpty()) {
+            throw new IllegalArgumentException("Topic name cannot be empty");
+        }
+
+        messageSender.send(topic, key, msg);
+
+        Map<String, Object> res = new HashMap<>();
+        res.put("status", "success");
+        res.put("topic", topic);
+        res.put("msg", "Message queued");
+        return res;
+    }
+
+    /**
+     * 批量测试接口:一次性向多个 Topic 发送消息
+     */
+    @PostMapping("/send-batch")
+    public Map<String, Object> sendBatch(@RequestParam(defaultValue = "batch-msg") String msg) {
+        String[] topics = {"yusp-topic-A", "yusp-topic-B", "yusp-topic-C"};
+
+        for (String topic : topics) {
+            messageSender.send(topic, "batch-key", msg + "-for-" + topic);
+        }
+
+        Map<String, Object> res = new HashMap<>();
+        res.put("status", "success");
+        res.put("msg", "Messages sent to multiple topics: " + String.join(", ", topics));
+        return res;
+    }
+}

+ 38 - 0
schedule-producer/src/main/java/cn/com/yusys/producer/model/Message.java

@@ -0,0 +1,38 @@
+package cn.com.yusys.producer.model;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import lombok.Data;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Kafka 消息发送请求参数对象
+ * 对应前端/调用方传入的 JSON 结构
+ */
+@Data
+public class Message  {
+
+
+    /**
+     * 目标 Topic 名称 (必填)
+     * 支持动态指定,如 "yusp-topic-A"
+     */
+    private String topic;
+
+    /**
+     * 消息 Key (可选)
+     * 用于分区策略,如果为 null,Kafka 会使用轮询或随机策略
+     */
+    private String key;
+
+    /**
+     * 消息体内容 (必填)
+     * 可以是纯文本,也可以是 JSON 字符串
+     */
+    private String message;
+
+
+
+}

+ 46 - 0
schedule-producer/src/main/java/cn/com/yusys/producer/service/MessageSender.java

@@ -0,0 +1,46 @@
+package cn.com.yusys.producer.service;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.stereotype.Service;
+import org.springframework.util.concurrent.ListenableFuture;
+import org.springframework.util.concurrent.ListenableFutureCallback;
+
+import javax.annotation.Resource;
+
+@Service
+public class MessageSender {
+    private static final Logger log = LoggerFactory.getLogger(MessageSender.class);
+
+    @Resource
+    private KafkaTemplate<String, String> kafkaTemplate;
+
+    /**
+     * 向指定 Topic 发送消息
+     * @param topic 目标 Topic 名称
+     * @param key 消息键
+     * @param message 消息内容
+     */
+    public void send(String topic, String key, String message) {
+        log.info("Sending message to Topic [{}]: Key={}, Msg={}", topic, key, message);
+
+        // send 方法第一个参数即为 Topic
+        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, key, message);
+
+        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
+            @Override
+            public void onSuccess(SendResult<String, String> result) {
+                log.info("Sent successfully to Topic [{}]. Offset: {}",
+                        result.getRecordMetadata().topic(),
+                        result.getRecordMetadata().offset());
+            }
+
+            @Override
+            public void onFailure(Throwable ex) {
+                log.error("Send failed to Topic [{}]", topic, ex);
+            }
+        });
+    }
+}

+ 18 - 0
schedule-producer/src/main/resources/application.yml

@@ -0,0 +1,18 @@
+server:
+  port: 8081
+
+spring:
+  application:
+    name: schedule-producer
+  kafka:
+    bootstrap-servers: 10.192.72.13:9092
+    producer:
+      key-serializer: org.apache.kafka.common.serialization.StringSerializer
+      value-serializer: org.apache.kafka.common.serialization.StringSerializer
+      acks: all
+      retries: 3
+    topics:
+      test-topic:
+        partitions: 3
+
+