Просмотр исходного кода

Merge remote and add parse service

zsh 3 недель назад
Родитель
Сommit
f61a7a5f0d
36 измененных файлов с 2035 добавлено и 1 удалено
  1. 41 1
      .gitignore
  2. 10 0
      .idea/.gitignore
  3. 13 0
      .idea/encodings.xml
  4. 8 0
      .idea/inspectionProfiles/Project_Default.xml
  5. 14 0
      .idea/misc.xml
  6. 6 0
      .idea/vcs.xml
  7. 208 0
      README.md
  8. 80 0
      pom.xml
  9. 58 0
      schedule-consumer/pom.xml
  10. 11 0
      schedule-consumer/src/main/java/cn/com/yusys/consumer/ConsumerApplication.java
  11. 98 0
      schedule-consumer/src/main/java/cn/com/yusys/consumer/config/KafkaConsumerConfig.java
  12. 44 0
      schedule-consumer/src/main/java/cn/com/yusys/consumer/listener/MessageListener.java
  13. 19 0
      schedule-consumer/src/main/resources/application.yml
  14. BIN
      schedule-manager/doc/解析服务管理流程图.png
  15. 95 0
      schedule-manager/pom.xml
  16. 13 0
      schedule-manager/src/main/java/cn/com/yusys/manager/ManagerApplication.java
  17. 21 0
      schedule-manager/src/main/java/cn/com/yusys/manager/common/ParseInstanceStatusRegistry.java
  18. 116 0
      schedule-manager/src/main/java/cn/com/yusys/manager/common/PortPool.java
  19. 28 0
      schedule-manager/src/main/java/cn/com/yusys/manager/config/DockerConfig.java
  20. 53 0
      schedule-manager/src/main/java/cn/com/yusys/manager/config/ParserConfig.java
  21. 93 0
      schedule-manager/src/main/java/cn/com/yusys/manager/instanceManager/Impl/DockerInstanceManager.java
  22. 7 0
      schedule-manager/src/main/java/cn/com/yusys/manager/instanceManager/InstanceManager.java
  23. 42 0
      schedule-manager/src/main/java/cn/com/yusys/manager/model/InstanceStatus.java
  24. 36 0
      schedule-manager/src/main/java/cn/com/yusys/manager/model/InstanceStatusResponse.java
  25. 390 0
      schedule-manager/src/main/java/cn/com/yusys/manager/service/InstanceMonitorService.java
  26. 104 0
      schedule-manager/src/main/java/cn/com/yusys/manager/util/ParseInstanceClient.java
  27. 62 0
      schedule-manager/src/main/resources/application.yml
  28. 41 0
      schedule-manager/src/test/java/cn/com/yusys/manager/instanceManager/DockerInstanceManagerTest.java
  29. 57 0
      schedule-manager/src/test/java/cn/com/yusys/manager/util/ParseInstanceClientTest.java
  30. 59 0
      schedule-producer/pom.xml
  31. 11 0
      schedule-producer/src/main/java/cn/com/yusys/producer/ProducerApplication.java
  32. 37 0
      schedule-producer/src/main/java/cn/com/yusys/producer/config/KafkaProducerConfig.java
  33. 58 0
      schedule-producer/src/main/java/cn/com/yusys/producer/controller/ProducerController.java
  34. 38 0
      schedule-producer/src/main/java/cn/com/yusys/producer/model/Message.java
  35. 46 0
      schedule-producer/src/main/java/cn/com/yusys/producer/service/MessageSender.java
  36. 18 0
      schedule-producer/src/main/resources/application.yml

+ 41 - 1
.gitignore

@@ -1,2 +1,42 @@
 parse/
-__pycache__/
+__pycache__/
+target/
+!.mvn/wrapper/maven-wrapper.jar
+!**/src/main/**/target/
+!**/src/test/**/target/
+.kotlin
+
+### IntelliJ IDEA ###
+.idea/modules.xml
+.idea/jarRepositories.xml
+.idea/compiler.xml
+.idea/libraries/
+*.iws
+*.iml
+*.ipr
+
+### Eclipse ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+build/
+!**/src/main/**/build/
+!**/src/test/**/build/
+
+### VS Code ###
+.vscode/
+
+### Mac OS ###
+.DS_Store
+>>>>>>> 66f062c39a31b3a3314ee36517fa38c452e5fbeb

+ 10 - 0
.idea/.gitignore

@@ -0,0 +1,10 @@
+# 默认忽略的文件
+/shelf/
+/workspace.xml
+# 已忽略包含查询文件的默认文件夹
+/queries/
+# Datasource local storage ignored files
+/dataSources/
+/dataSources.local.xml
+# 基于编辑器的 HTTP 客户端请求
+/httpRequests/

+ 13 - 0
.idea/encodings.xml

@@ -0,0 +1,13 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="Encoding">
+    <file url="file://$PROJECT_DIR$/schedule-consumer/src/main/java" charset="UTF-8" />
+    <file url="file://$PROJECT_DIR$/schedule-consumer/src/main/resources" charset="UTF-8" />
+    <file url="file://$PROJECT_DIR$/schedule-manager/src/main/java" charset="UTF-8" />
+    <file url="file://$PROJECT_DIR$/schedule-manager/src/main/resources" charset="UTF-8" />
+    <file url="file://$PROJECT_DIR$/schedule-producer/src/main/java" charset="UTF-8" />
+    <file url="file://$PROJECT_DIR$/schedule-producer/src/main/resources" charset="UTF-8" />
+    <file url="file://$PROJECT_DIR$/src/main/java" charset="UTF-8" />
+    <file url="file://$PROJECT_DIR$/src/main/resources" charset="UTF-8" />
+  </component>
+</project>

+ 8 - 0
.idea/inspectionProfiles/Project_Default.xml

@@ -0,0 +1,8 @@
+<component name="InspectionProjectProfileManager">
+  <profile version="1.0">
+    <option name="myName" value="Project Default" />
+    <inspection_tool class="AutoCloseableResource" enabled="true" level="WARNING" enabled_by_default="true">
+      <option name="METHOD_MATCHER_CONFIG" value="java.util.Formatter,format,java.io.Writer,append,com.google.common.base.Preconditions,checkNotNull,org.hibernate.Session,close,java.io.PrintWriter,printf,java.io.PrintStream,printf,java.lang.foreign.Arena,ofAuto,java.lang.foreign.Arena,global,cn.com.yusys.manager.parser.DockerInstanceManager,dockerClient" />
+    </inspection_tool>
+  </profile>
+</component>

+ 14 - 0
.idea/misc.xml

@@ -0,0 +1,14 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="ExternalStorageConfigurationManager" enabled="true" />
+  <component name="MavenProjectsManager">
+    <option name="originalFiles">
+      <list>
+        <option value="$PROJECT_DIR$/pom.xml" />
+      </list>
+    </option>
+  </component>
+  <component name="ProjectRootManager" version="2" project-jdk-name="corretto-1.8" project-jdk-type="JavaSDK">
+    <output url="file://$PROJECT_DIR$/out" />
+  </component>
+</project>

+ 6 - 0
.idea/vcs.xml

@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="VcsDirectoryMappings">
+    <mapping directory="$PROJECT_DIR$" vcs="Git" />
+  </component>
+</project>

+ 208 - 0
README.md

@@ -0,0 +1,208 @@
+# 四级调度系统 (Four-Level Schedule)
+
+## 项目简介
+
+四级调度系统是一个基于Spring Boot和Kafka的分布式任务调度系统,通过Producer-Consumer模式实现任务的生产与消费,并配备Manager模块进行实例管理和动态扩缩容。系统支持Docker容器化部署,能够根据任务积压情况和资源负载自动调整实例数量,实现高效的资源利用和任务处理。
+
+## 系统架构
+
+系统由三个核心模块组成:
+
+1. **schedule-producer**: 任务生产者模块,负责创建和发送任务到Kafka消息队列
+2. **schedule-consumer**: 任务消费者模块,负责从Kafka消费并处理任务
+3. **schedule-manager**: 管理模块,负责监控实例状态、管理Docker容器和动态扩缩容
+
+## 技术栈
+
+- Java 8
+- Spring Boot 2.7.18
+- Spring Kafka 2.8.11
+- Docker (通过Docker Java API)
+- Fastjson 1.2.83
+- Lombok 1.18.30
+
+## 模块说明
+
+### schedule-producer
+
+任务生产者模块,提供RESTful API用于创建和发送任务。
+
+**主要功能:**
+- 接收任务请求
+- 将任务序列化并发送到Kafka主题
+- 支持任务重试机制
+
+**配置说明:**
+- 服务端口: 8081
+- Kafka服务器: 10.192.72.13:9092
+- 默认主题: test-topic (6个分区)
+
+### schedule-consumer
+
+任务消费者模块,监听Kafka主题并消费任务。
+
+**主要功能:**
+- 监听Kafka主题
+- 消费并处理任务
+- 支持手动提交offset
+
+**配置说明:**
+- 服务端口: 8082
+- Kafka消费者组: yusp-topic-group
+- 监听主题: test-topic
+
+### schedule-manager
+
+管理模块,负责监控和管理解析服务实例。
+
+**主要功能:**
+- 监控实例健康状态
+- 管理Docker容器生命周期
+- 根据任务积压和资源负载动态扩缩容
+- GPU负载监控
+
+**配置说明:**
+- 服务端口: 8083
+- Docker守护进程地址: tcp://127.0.0.1:2375
+- 心跳超时阈值: 30000ms
+- 最小活跃实例数: 3
+- 最大活跃实例数: 10
+- 任务积压阈值: 100
+- GPU负载阈值: 80%
+
+## 环境要求
+
+- JDK 8+
+- Maven 3.6+
+- Docker (用于Manager模块)
+- Kafka 2.8+
+
+## 快速开始
+
+### 1. 克隆项目
+
+```bash
+git clone <repository-url>
+cd four-level-schedule
+```
+
+### 2. 配置Kafka连接
+
+修改各模块的`application.yml`文件,配置Kafka服务器地址:
+
+```yaml
+spring:
+  kafka:
+    bootstrap-servers: <your-kafka-server>:9092
+```
+
+### 3. 配置Docker连接
+
+修改`schedule-manager/src/main/resources/application.yml`,配置Docker守护进程地址:
+
+```yaml
+docker:
+  host: tcp://<your-docker-host>:2375
+```
+
+### 4. 编译项目
+
+```bash
+mvn clean package
+```
+
+### 5. 启动服务
+
+分别启动三个模块:
+
+```bash
+# 启动Producer
+java -jar schedule-producer/target/schedule-producer.jar
+
+# 启动Consumer
+java -jar schedule-consumer/target/schedule-consumer.jar
+
+# 启动Manager
+java -jar schedule-manager/target/schedule-manager.jar
+```
+
+## 使用指南
+
+### 发送任务
+
+通过Producer模块提供的RESTful API发送任务:
+
+```bash
+curl -X POST http://localhost:8081/api/task   -H "Content-Type: application/json"   -d '{"taskData": "your task data"}'
+```
+
+### 监控任务
+
+Manager模块会自动监控任务处理情况和实例状态,并根据配置的阈值进行扩缩容。
+
+## 配置说明
+
+### Kafka配置
+
+所有模块都使用相同的Kafka配置,包括bootstrap-servers、序列化器等。可根据实际环境调整。
+
+### Manager监控配置
+
+Manager模块提供了丰富的监控配置选项:
+
+- `heartbeat-timeout`: 心跳超时阈值
+- `min-active-instance`: 最小活跃实例数
+- `max-active-instance`: 最大活跃实例数
+- `task-backlog-threshold`: 任务积压阈值
+- `gpu-load-threshold`: GPU负载阈值
+
+## 项目结构
+
+```
+four-level-schedule/
+├── pom.xml                          # 父POM文件
+├── schedule-producer/               # 任务生产者模块
+│   ├── pom.xml
+│   └── src/
+│       ├── main/
+│       │   ├── java/
+│       │   └── resources/
+│       └── test/
+├── schedule-consumer/               # 任务消费者模块
+│   ├── pom.xml
+│   └── src/
+│       ├── main/
+│       │   ├── java/
+│       │   └── resources/
+│       └── test/
+└── schedule-manager/                # 管理模块
+    ├── pom.xml
+    ├── doc/                         # 文档目录
+    └── src/
+        ├── main/
+        │   ├── java/
+        │   └── resources/
+        └── test/
+```
+
+## 常见问题
+
+### 1. Kafka连接失败
+
+检查Kafka服务器地址和端口是否正确,确保Kafka服务正常运行。
+
+### 2. Docker连接失败
+
+检查Docker守护进程是否启用,确保Docker API端口(2375)已开放。
+
+### 3. 实例无法自动扩缩容
+
+检查Manager模块的监控配置是否合理,确保任务积压和资源负载达到阈值。
+
+## 许可证
+
+本项目采用内部许可证,未经授权不得用于商业用途。
+
+## 联系方式
+
+如有问题或建议,请联系项目维护者。

+ 80 - 0
pom.xml

@@ -0,0 +1,80 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.example</groupId>
+    <artifactId>four-level-schedule</artifactId>
+    <version>1.0-SNAPSHOT</version>
+    <packaging>pom</packaging>
+    <modules>
+        <module>schedule-producer</module>
+        <module>schedule-consumer</module>
+        <module>schedule-manager</module>
+    </modules>
+
+    <!-- 统一属性管理 -->
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <java.version>1.8</java.version>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <!-- 匹配 Spring Framework 5.3.33 的最新 Boot 版本 -->
+        <spring-boot.version>2.7.18</spring-boot.version>
+        <!-- 对应的 Spring Kafka 版本 -->
+        <spring-kafka.version>2.8.11</spring-kafka.version>
+        <fastjson.version>1.2.83</fastjson.version>
+        <lombok.version>1.18.30</lombok.version>
+    </properties>
+
+    <!-- 依赖管理:子模块引用时无需写版本号 -->
+    <dependencyManagement>
+        <dependencies>
+            <!-- Spring Boot Dependencies -->
+            <dependency>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-dependencies</artifactId>
+                <version>${spring-boot.version}</version>
+                <type>pom</type>
+                <scope>import</scope>
+            </dependency>
+
+            <!-- Spring Kafka (显式指定版本以匹配 Boot 2.7.x) -->
+            <dependency>
+                <groupId>org.springframework.kafka</groupId>
+                <artifactId>spring-kafka</artifactId>
+                <version>${spring-kafka.version}</version>
+            </dependency>
+
+            <!-- 其他通用依赖 -->
+            <dependency>
+                <groupId>com.alibaba</groupId>
+                <artifactId>fastjson</artifactId>
+                <version>${fastjson.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.projectlombok</groupId>
+                <artifactId>lombok</artifactId>
+                <version>${lombok.version}</version>
+                <optional>true</optional>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>${java.version}</source>
+                    <target>${java.version}</target>
+                    <encoding>${project.build.sourceEncoding}</encoding>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>
+

+ 58 - 0
schedule-consumer/pom.xml

@@ -0,0 +1,58 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.example</groupId>
+        <artifactId>four-level-schedule</artifactId>
+        <version>1.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>schedule-consumer</artifactId>
+    <packaging>jar</packaging>
+    <name>Schedule Consumer</name>
+
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+    <dependencies>
+        <!-- Web 依赖保留用于 Actuator 或健康检查,虽主要逻辑是监听 -->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <finalName>schedule-consumer</finalName>
+        <plugins>
+            <plugin>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>repackage</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

+ 11 - 0
schedule-consumer/src/main/java/cn/com/yusys/consumer/ConsumerApplication.java

@@ -0,0 +1,11 @@
+package cn.com.yusys.consumer;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class ConsumerApplication {
+    public static void main(String[] args) {
+        SpringApplication.run(ConsumerApplication.class, args);
+    }
+}

+ 98 - 0
schedule-consumer/src/main/java/cn/com/yusys/consumer/config/KafkaConsumerConfig.java

@@ -0,0 +1,98 @@
+package cn.com.yusys.consumer.config;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.core.*;
+import org.springframework.kafka.listener.*;
+import org.springframework.util.backoff.FixedBackOff;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Configuration
+public class KafkaConsumerConfig {
+
+    private static final Logger log = LoggerFactory.getLogger(KafkaConsumerConfig.class);
+
+    @Value("${spring.kafka.bootstrap-servers}")
+    private String bootstrapServers;
+
+    @Value("${spring.kafka.consumer.group-id}")
+    private String groupId;
+
+    // 1. 死信生产者
+    @Bean
+    public ProducerFactory<String, String> dltProducerFactory() {
+        Map<String, Object> props = new HashMap<>();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(ProducerConfig.ACKS_CONFIG, "all");
+        return new DefaultKafkaProducerFactory<>(props);
+    }
+
+    @Bean
+    public KafkaTemplate<String, String> dltKafkaTemplate() {
+        return new KafkaTemplate<>(dltProducerFactory());
+    }
+
+    // 2. 死信恢复器
+    @Bean
+    public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer() {
+        return new DeadLetterPublishingRecoverer(dltKafkaTemplate());
+    }
+
+    // 3. 错误处理器 (重试 3 次 -> 死信)
+    @Bean
+    public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer recoverer) {
+        long interval = 1000L;
+        long maxAttempts = 3L;
+
+        // 构造函数传入恢复器和退避策略
+        DefaultErrorHandler handler = new DefaultErrorHandler(recoverer, new FixedBackOff(interval, maxAttempts));
+
+
+        return handler;
+    }
+
+    // 4. 消费者工厂
+    @Bean
+    public ConsumerFactory<String, String> consumerFactory() {
+        Map<String, Object> props = new HashMap<>();
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 必须 false
+
+        return new DefaultKafkaConsumerFactory<>(props);
+    }
+
+    // 5. 监听器工厂 (整合 Ack 模式和 ErrorHandler)
+    @Bean
+    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
+            DefaultErrorHandler errorHandler) {
+
+        ConcurrentKafkaListenerContainerFactory<String, String> factory =
+                new ConcurrentKafkaListenerContainerFactory<>();
+        factory.setConsumerFactory(consumerFactory());
+        factory.setConcurrency(3);
+
+        // 【核心】注入错误处理器
+        factory.setCommonErrorHandler(errorHandler);
+
+        // 【核心】设置手动提交模式
+        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
+
+        return factory;
+    }
+}

+ 44 - 0
schedule-consumer/src/main/java/cn/com/yusys/consumer/listener/MessageListener.java

@@ -0,0 +1,44 @@
+package cn.com.yusys.consumer.listener;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.support.Acknowledgment;
+import org.springframework.stereotype.Component;
+
+@Component
+public class MessageListener {
+    private static final Logger log = LoggerFactory.getLogger(MessageListener.class);
+
+    // 从配置读取逗号分隔的 Topic 字符串
+    @Value("${kafka.topics.listen}")
+    private String topicsConfig;
+
+    /**
+     * 监听多个 Topic
+     * 注意:同一个 groupId 下的不同消费者实例会共同负载均衡消费这些 Topic 的所有分区。
+     */
+    @KafkaListener(topics = "${kafka.topics.listen}", groupId = "${spring.kafka.consumer.group-id}")
+    public void listen(String message, ConsumerRecord<?, ?> record, Acknowledgment acknowledgment) {
+        // 打印当前消息来自哪个 Topic,方便区分
+        log.info("=== [CONSUMER] 收到消息 | Topic: {} | Key: {} | Msg: {} | Offset: {} ===",
+                record.topic(), record.key(), message, record.offset());
+
+        processBusinessLogic(message);
+        acknowledgment.acknowledge();
+
+    }
+
+    private void processBusinessLogic(String message) {
+        if ("error".equals(message)) {
+            throw new RuntimeException("Simulated Business Exception");
+        }
+        try {
+            Thread.sleep(500);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
+}

+ 19 - 0
schedule-consumer/src/main/resources/application.yml

@@ -0,0 +1,19 @@
+server:
+  port: 8082
+
+spring:
+  application:
+    name: yusp-kafka-consumer
+  kafka:
+    bootstrap-servers: 10.192.72.13:9092
+    consumer:
+      group-id: yusp-topic-group
+      enable-auto-commit: false
+      auto-offset-reset: earliest
+      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+
+# 可选:在配置中定义 Topic 列表,方便管理
+kafka:
+  topics:
+    listen: test-topic

BIN
schedule-manager/doc/解析服务管理流程图.png


+ 95 - 0
schedule-manager/pom.xml

@@ -0,0 +1,95 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.example</groupId>
+        <artifactId>four-level-schedule</artifactId>
+        <version>1.0-SNAPSHOT</version>
+    </parent>
+
+    <groupId>cn.com.yusys.manager</groupId>
+    <artifactId>schedule-manager</artifactId>
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+    <packaging>jar</packaging>
+    <name>Schedule Manager</name>
+
+
+    <dependencies>
+        <!-- Web 依赖保留用于 Actuator 或健康检查,虽主要逻辑是监听 -->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+        </dependency>
+        <!-- Docker Java客户端 -->
+        <dependency>
+            <groupId>com.github.docker-java</groupId>
+            <artifactId>docker-java</artifactId>
+            <version>3.3.4</version>
+        </dependency>
+        <dependency>
+            <groupId>com.github.docker-java</groupId>
+            <artifactId>docker-java-transport-httpclient5</artifactId>
+            <version>3.3.4</version>
+        </dependency>
+        <!-- Spring WebFlux(WebClient) -->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-webflux</artifactId>
+        </dependency>
+
+        <!-- SpringBoot测试核心依赖 -->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <!-- Junit5(SpringBoot 2.4+默认集成) -->
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-engine</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <finalName>schedule-manager</finalName>
+        <plugins>
+            <plugin>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>repackage</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>

+ 13 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/ManagerApplication.java

@@ -0,0 +1,13 @@
+package cn.com.yusys.manager;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.scheduling.annotation.EnableScheduling;
+
+@SpringBootApplication
+@EnableScheduling
+public class ManagerApplication {
+    public static void main(String[] args) {
+        SpringApplication.run(ManagerApplication.class, args);
+    }
+}

+ 21 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/common/ParseInstanceStatusRegistry.java

@@ -0,0 +1,21 @@
+package cn.com.yusys.manager.common;
+
+import cn.com.yusys.manager.model.InstanceStatus;
+import org.springframework.stereotype.Component;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * 解析服务实例状态注册表
+ * 用于管理和维护所有解析服务实例的状态信息
+ */
+@Component
+public class ParseInstanceStatusRegistry {
+    //保存解析服务实例状态
+    private final Map<String, InstanceStatus> activeInstancePool = new ConcurrentHashMap<>();
+
+
+    public Map<String, InstanceStatus> getActiveInstancePool() {
+        return activeInstancePool;
+    }
+}

+ 116 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/common/PortPool.java

@@ -0,0 +1,116 @@
+package cn.com.yusys.manager.common;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * 端口池管理工具类
+ * 负责端口的分配、释放、可用性校验
+ */
+@Slf4j
+@Component
+public class PortPool {
+
+    // 可用端口池(key=端口号,value=端口号)
+    private final Map<Integer, Integer> availablePorts = new ConcurrentHashMap<>();
+
+    // 已分配端口池(记录已分配的端口,防止重复分配)
+    private final Map<Integer, String> allocatedPorts = new ConcurrentHashMap<>();
+
+    // 端口配置
+    private int portStart = 1030;
+    private int portEnd = 1050;
+
+    /**
+     * 初始化端口池
+     * 可通过@Value注入配置,这里先保留默认值,实际使用时替换为配置文件读取
+     */
+    @PostConstruct
+    public void initPortPool() {
+        log.info("开始初始化端口池,端口范围:{} - {}", portStart, portEnd);
+        for (int port = portStart; port <= portEnd; port++) {
+                availablePorts.put(port, port);
+        }
+        log.info("端口池初始化完成,可用端口数:{}", availablePorts.size());
+    }
+
+    /**
+     * 原子化分配端口
+     * @return 可用端口号(无可用端口时返回null)
+     */
+    public Integer allocatePort() {
+        // 迭代器原子操作,避免并发冲突
+        Iterator<Integer> portIterator = availablePorts.keySet().iterator();
+        if (portIterator.hasNext()) {
+            Integer port = portIterator.next();
+            // 从可用池移除,加入已分配池
+            portIterator.remove();
+            allocatedPorts.put(port, Thread.currentThread().getName());
+            log.info("成功分配端口:{},剩余可用端口数:{}", port, availablePorts.size());
+            return port;
+        }
+        log.warn("端口池无可用端口,分配失败");
+        return null;
+    }
+
+    /**
+     * 释放端口(归还到端口池)
+     * @param port 要释放的端口号
+     */
+    public void releasePort(Integer port) {
+        if (port == null) {
+            log.warn("释放端口失败:端口号为null");
+            return;
+        }
+        // 从已分配池移除
+        if (allocatedPorts.remove(port) != null) {
+                availablePorts.put(port, port);
+                log.info("成功释放端口:{},当前可用端口数:{}", port, availablePorts.size());
+
+        } else {
+            log.warn("端口{}未被分配,无需释放", port);
+        }
+    }
+
+
+
+    /**
+     * 获取可用端口数
+     */
+    public int getAvailablePortCount() {
+        return availablePorts.size();
+    }
+
+    /**
+     * 获取已分配端口列表
+     */
+    public Set<Integer> getAllocatedPortList() {
+        return allocatedPorts.keySet();
+    }
+
+    /**
+     * 动态扩展端口池
+     * @param newStart 新增端口起始值
+     * @param newEnd 新增端口结束值
+     */
+    public void expandPortPool(int newStart, int newEnd) {
+        log.info("扩展端口池,新增范围:{} - {}", newStart, newEnd);
+        for (int port = newStart; port <= newEnd; port++) {
+            if (!availablePorts.containsKey(port) && !allocatedPorts.containsKey(port)
+                    ) {
+                availablePorts.put(port, port);
+            }
+        }
+        log.info("端口池扩展完成,新增可用端口数:{}", availablePorts.size());
+    }
+
+
+}

+ 28 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/config/DockerConfig.java

@@ -0,0 +1,28 @@
+package cn.com.yusys.manager.config;
+
+import com.github.dockerjava.api.DockerClient;
+import com.github.dockerjava.core.DefaultDockerClientConfig;
+import com.github.dockerjava.core.DockerClientBuilder;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+
+
+/**
+ * Docker配置类
+ * 用于配置和创建Docker客户端的Bean
+ */
+@Configuration
+public class DockerConfig {
+
+    @Bean
+    public DockerClient dockerClient() {
+        // 自动读取系统环境变量
+        DefaultDockerClientConfig config = DefaultDockerClientConfig.createDefaultConfigBuilder()
+                .withDockerTlsVerify(false)
+                .build();
+
+        // 构建客户端
+        return DockerClientBuilder.getInstance(config).build();
+    }
+}

+ 53 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/config/ParserConfig.java

@@ -0,0 +1,53 @@
+package cn.com.yusys.manager.config;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+/**
+ * 解析服务配置常量
+ */
+@Component
+public class ParserConfig {
+    // 心跳超时阈值(毫秒)
+    @Value("${parser.monitor.heartbeat-timeout}")
+    public long HEARTBEAT_TIMEOUT;
+
+    // 最小活跃实例数
+    @Value("${parser.monitor.min-active-instance}")
+    public int MIN_ACTIVE_INSTANCE;
+
+    // 最大活跃实例数
+    @Value("${parser.monitor.max-active-instance}")
+    public int MAX_ACTIVE_INSTANCE;
+
+    // 任务积压阈值
+    @Value("${parser.monitor.task-backlog-threshold}")
+    public int TASK_BACKLOG_THRESHOLD;
+
+    // 状态接口调用超时(毫秒)
+    @Value("${parser.monitor.status-query-timeout}")
+    public int STATUS_QUERY_TIMEOUT;
+
+    // 状态接口连续失败次数阈值
+    @Value("${parser.monitor.status-query-fail-count}")
+    public int STATUS_QUERY_FAIL_COUNT;
+
+    // Java管理器地址
+    @Value("${parser.manager.url}")
+    public String MANAGER_URL;
+
+    // Python实例状态接口路径
+    public static final String STATUS_API = "/status";
+
+    //实例镜像名称
+    @Value("${parser.instance.image}")
+    public String IMAGE_NAME;
+
+    // GPU负载阈值(百分比)
+    @Value("${parser.monitor.gpu-load-threshold}")
+    public double GPU_LOAD_THRESHOLD;
+
+    // GPU负载扩容实例数
+    @Value("${parser.monitor.gpu-scale-instance-num}")
+    public int GPU_SCALE_INSTANCE_NUM;
+}

+ 93 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/instanceManager/Impl/DockerInstanceManager.java

@@ -0,0 +1,93 @@
+package cn.com.yusys.manager.instanceManager.Impl;
+
+import cn.com.yusys.manager.common.ParseInstanceStatusRegistry;
+import cn.com.yusys.manager.instanceManager.InstanceManager;
+import com.github.dockerjava.api.DockerClient;
+import com.github.dockerjava.api.command.CreateContainerResponse;
+import com.github.dockerjava.api.command.StopContainerCmd;
+import com.github.dockerjava.api.model.HostConfig;
+import com.github.dockerjava.api.model.ExposedPort;
+import com.github.dockerjava.api.model.PortBinding;
+import com.github.dockerjava.api.model.Ports;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+
+@Component
+public class DockerInstanceManager  implements InstanceManager {
+
+    @Value("${docker.host:tcp://127.0.0.1:2375}")
+    private String dockerHost;
+
+    private static final Logger log = LoggerFactory.getLogger(DockerInstanceManager.class);
+
+    @Resource
+    private ParseInstanceStatusRegistry instanceStateRegistry ;
+
+    private final DockerClient client;
+
+    public DockerInstanceManager(DockerClient dockerClient) {
+        this.client = dockerClient;
+    }
+    /**
+     * 拉起解析服务实例
+     * @param imageName Python解析服务的镜像名
+     * @param port 宿主机映射端口(避免端口冲突)
+     * @return 容器ID(用于后续终结实例)
+     */
+    public String startParseInstance(String imageName, int port) {
+        try {
+
+            HostConfig hostConfig = HostConfig.newHostConfig()
+                    .withPortBindings(new PortBinding(
+                            Ports.Binding.bindPort(port),
+                            new ExposedPort(8000) // Python服务内部端口
+                    ))
+                    .withMemory(2048 * 1024 * 1024L) // 限制内存2G
+                    .withCpuCount(2L); // 限制CPU核心数2
+
+            // 创建容器
+            CreateContainerResponse container = client.createContainerCmd(imageName)
+                    .withHostConfig(hostConfig)
+                    .withExposedPorts(new ExposedPort(8000))
+                    .withCmd("python", "parse_service.py", "--host", "0.0.0.0", "--port", "8000")
+                    .withName("python-parser-" + port) // 容器命名,便于识别
+                    .exec();
+
+            // 启动容器
+            client.startContainerCmd(container.getId()).exec();
+
+            String containerId = container.getId();
+            log.info("Python实例启动成功,容器ID:{},映射端口:{}", containerId, port);
+            return containerId;
+        } catch (Exception e) {
+            log.error("Python实例启动失败:{}", e.getMessage());
+            throw new RuntimeException("拉起Python实例失败:" + e.getMessage());
+        }
+    }
+
+    public void terminateInstance(String containerId) {
+        try {
+            // 1. 尝试优雅停止 (等待 10 秒)
+            StopContainerCmd stopCmd = client.stopContainerCmd(containerId);
+            stopCmd.withTimeout(20);
+            stopCmd.exec();
+
+            log.info("实例已停止: {}", containerId);
+
+            // 2. 移除容器 (释放资源)
+            client.removeContainerCmd(containerId)
+                    .withForce(true) // 如果还在运行则强制移除
+                    .withRemoveVolumes(true) // 清理匿名卷
+                    .exec();
+
+            log.info("实例容器已移除: {}", containerId);
+        } catch (Exception e) {
+            log.info("终结实例失败: {}", e.getMessage());
+        }
+    }
+
+}

+ 7 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/instanceManager/InstanceManager.java

@@ -0,0 +1,7 @@
+package cn.com.yusys.manager.instanceManager;
+
+// 实例管理器抽象接口
+public interface InstanceManager {
+    String startParseInstance(String imageName, int port) ;
+    void terminateInstance(String containerId) ;
+}

+ 42 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/model/InstanceStatus.java

@@ -0,0 +1,42 @@
+package cn.com.yusys.manager.model;
+
+
+import lombok.Data;
+
+@Data
+public class InstanceStatus {
+
+    //实例容器id
+    private String containerId;
+
+    //实例名称
+    private String instanceName;
+
+    //实例ip
+    private String ip;
+
+    //实例端口
+    private Integer port;
+
+    //实例状态 0-空闲 1-运行中 2-异常
+    private Integer status;
+
+    //实例cpu占用
+    private Double cpuUsage;
+
+    //实例内存占用
+    private Double memoryUsage;
+
+    //实例gpu占用
+    private Double gpuUsage;
+
+    //实例gpu内存占用
+    private Double gpuMemory;
+
+    //上次心跳时间
+    private Long lastHeartbeatTime;
+
+    //查询失败次数
+    private Integer statusQueryFailCount;
+
+}

+ 36 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/model/InstanceStatusResponse.java

@@ -0,0 +1,36 @@
+package cn.com.yusys.manager.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.Data;
+
+/**
+ * 解析服务实例状态响应实体
+ */
+@Data
+public class InstanceStatusResponse {
+    private Integer code;
+    private String msg;
+    private InstanceStatusData data;
+
+    /**
+     * 响应数据体
+     */
+    @Data
+    public static class InstanceStatusData {
+
+        private Integer status;
+
+        @JsonProperty("cpu_usage")
+        private Double cpuUsage;
+
+        @JsonProperty("gpu_usage")
+        private Double gpuUsage;
+
+        @JsonProperty("memory_usage")
+        private Double memoryUsage;
+
+        @JsonProperty("gpu_memory")
+        private Double gpuMemory;
+    }
+
+}

+ 390 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/service/InstanceMonitorService.java

@@ -0,0 +1,390 @@
+package cn.com.yusys.manager.service;
+
+import cn.com.yusys.manager.common.ParseInstanceStatusRegistry;
+import cn.com.yusys.manager.config.ParserConfig;
+import cn.com.yusys.manager.model.InstanceStatus;
+import cn.com.yusys.manager.model.InstanceStatusResponse;
+import cn.com.yusys.manager.instanceManager.Impl.DockerInstanceManager;
+import cn.com.yusys.manager.util.ParseInstanceClient;
+import cn.com.yusys.manager.common.PortPool; 
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * 解析实例监控核心服务
+ * 该服务负责管理解析实例的生命周期,包括实例的创建、状态监控、心跳检测和负载管理
+ */
+@Slf4j
+@Service
+@RequiredArgsConstructor
+public class InstanceMonitorService {
+
+    // 活跃实例池
+    @Resource
+    private ParseInstanceStatusRegistry instancestatusRegistry;
+
+    // 实例接口调用客户端
+    @Resource
+    private final ParseInstanceClient instanceClient;
+
+    // 解析服务配置
+    @Resource
+    private final ParserConfig parserConfig;
+
+    //docker实例管理器
+    @Resource
+    private DockerInstanceManager dockerInstanceManager;
+
+    // 注入独立的端口池管理工具类
+    @Resource
+    private PortPool portPool;
+
+    @PostConstruct
+    public void initParseInstance(){
+        log.info("开始初始化解析实例...");
+        //启动时初始化实例
+        for (int i = 0; i < parserConfig.MIN_ACTIVE_INSTANCE; i++) {
+            // 4. 使用PortPool分配端口
+            Integer port = portPool.allocatePort();
+            if(port != null){
+                String containerId = dockerInstanceManager.startParseInstance(parserConfig.IMAGE_NAME, port);
+                // 增加容器ID空值校验
+                if (containerId == null || containerId.isEmpty()) {
+                    log.error("初始化实例失败:Docker容器创建失败,端口:{}", port);
+                    portPool.releasePort(port); // 归还端口
+                    continue;
+                }
+                InstanceStatus instanceStatus = saveInstanceStatus(containerId, port);
+            } else {
+                log.error("初始化实例失败:无可用端口");
+                break;
+            }
+        }
+        log.info("解析实例初始化完成,等待30秒后启动监控任务");
+    }
+
+    /**
+     * 核心监控定时任务:
+     * - initialDelay = 30000:首次执行延迟30秒
+     * - fixedRate = 5000:之后每5秒执行一次
+     */
+    @Scheduled(initialDelay = 30000, fixedRate = 5000)
+    public void parserInstanceMonitor() {
+        Map<String, InstanceStatus> activeInstancePool = instancestatusRegistry.getActiveInstancePool();
+        try {
+            log.info("执行解析实例监控,当前活跃实例数:{}", activeInstancePool.size());
+
+            // 1. 主动调用/status接口更新实例状态
+            updateInstanceStatusByApi();
+
+            // 2. 心跳超时检测
+            checkHeartbeatTimeout();
+
+            // 3. 校验活跃实例数,触发实例拉起
+            checkAndSpinUpInstance();
+
+            // 4. 根据GPU负载动态扩缩容
+            checkAndScaleUpByGpuLoad();
+
+        } catch (Exception e) {
+            log.error("解析实例监控任务执行失败", e);
+        }
+    }
+
+    /**
+     * 心跳超时检测:超过阈值停止实例
+     */
+    private void checkHeartbeatTimeout() {
+        long now = System.currentTimeMillis();
+        Map<String, InstanceStatus> activeInstancePool = instancestatusRegistry.getActiveInstancePool();
+        // 复制为新集合,避免遍历中修改原集合
+        Map<String, InstanceStatus> copyPool = new HashMap<>(activeInstancePool);
+
+        copyPool.forEach((instanceId, state) -> {
+            if (state.getLastHeartbeatTime() == null || now - state.getLastHeartbeatTime() > parserConfig.HEARTBEAT_TIMEOUT) {
+                log.warn("实例{}心跳超时,标记为失联", instanceId);
+                state.setStatus(2);
+                // 原子操作移除实例
+                activeInstancePool.remove(instanceId);
+                // 使用PortPool释放端口
+                portPool.releasePort(state.getPort());
+                // 捕获Docker操作异常
+                try {
+                    dockerInstanceManager.terminateInstance(state.getContainerId());
+                } catch (Exception e) {
+                    log.error("终止实例{}的Docker容器失败", instanceId, e);
+                }
+            }
+        });
+    }
+
+
+    /**
+     * 主动调用实例状态接口更新实例状态
+     */
+    private void updateInstanceStatusByApi() {
+        Map<String, InstanceStatus> activeInstancePool = instancestatusRegistry.getActiveInstancePool();
+        activeInstancePool.forEach((instanceId, instancestatus) -> {
+            // 仅处理正常状态的实例
+            if (instancestatus.getStatus()==1|| instancestatus.getStatus()==0) {
+                InstanceStatusResponse response = null;
+                try {
+                    // 增加接口调用异常捕获
+                    response = instanceClient.getInstanceStatus(
+                            instancestatus.getIp(), instancestatus.getPort()
+                    );
+                } catch (Exception e) {
+                    log.error("调用实例{}状态接口失败", instanceId, e);
+                    int failCount = instancestatus.getStatusQueryFailCount() + 1;
+                    instancestatus.setStatusQueryFailCount(failCount);
+                    if (failCount >= parserConfig.STATUS_QUERY_FAIL_COUNT) {
+                        log.warn("实例{}连续{}次状态查询失败,标记为失联", instanceId, parserConfig.STATUS_QUERY_FAIL_COUNT);
+                        instancestatus.setStatus(2);
+                    }
+                    return;
+                }
+
+                if (response != null) {
+                    // 增加data空值校验
+                    InstanceStatusResponse.InstanceStatusData data = response.getData();
+                    if (data == null) {
+                        log.warn("实例{}状态接口返回数据为空", instanceId);
+                        int failCount = instancestatus.getStatusQueryFailCount() + 1;
+                        instancestatus.setStatusQueryFailCount(failCount);
+                        return;
+                    }
+                    // 更新实例状态
+                    instancestatus.setStatus(data.getStatus());
+                    instancestatus.setStatusQueryFailCount(0); // 重置失败次数
+
+                    // 更新负载信息
+                    instancestatus.setCpuUsage(data.getCpuUsage());
+                    instancestatus.setMemoryUsage(data.getMemoryUsage());
+                    instancestatus.setGpuUsage(data.getGpuUsage());
+                    instancestatus.setLastHeartbeatTime(System.currentTimeMillis());
+                    instancestatus.setGpuMemory(data.getGpuMemory());
+                    log.info("实例{}状态查询成功,状态:{}", instanceId, data.getStatus());
+
+                } else {
+                    // 接口调用失败,累计失败次数
+                    int failCount = instancestatus.getStatusQueryFailCount() + 1;
+                    instancestatus.setStatusQueryFailCount(failCount);
+                    log.warn("实例{}状态查询失败,累计失败次数:{}", instanceId, failCount);
+
+                    // 超过失败次数阈值,标记为失联
+                    if (failCount >= parserConfig.STATUS_QUERY_FAIL_COUNT) {
+                        log.warn("实例{}连续{}次状态查询失败,标记为失联", instanceId, parserConfig.STATUS_QUERY_FAIL_COUNT);
+                        instancestatus.setStatus(2);
+                    }
+                }
+            }
+        });
+    }
+
+    /**
+     * 校验活跃实例数,触发实例拉起
+     */
+    private void checkAndSpinUpInstance() {
+        // 获取有效活跃实例数
+        int currentActiveNum = getEffectiveActiveInstanceNum();
+        log.info("当前有效活跃实例数:{},最小要求:{}", currentActiveNum, parserConfig.MIN_ACTIVE_INSTANCE);
+
+        // 判断是否需要拉起实例
+        if (needSpinUpInstance(currentActiveNum)) {
+            int needCreateNum = parserConfig.MIN_ACTIVE_INSTANCE - currentActiveNum;
+            // 防止超过最大实例数
+            needCreateNum = Math.min(needCreateNum, parserConfig.MAX_ACTIVE_INSTANCE - currentActiveNum);
+
+            log.info("需要拉起{}个解析实例", needCreateNum);
+            for (int i = 0; i < needCreateNum; i++) {
+                // 使用PortPool分配端口
+                Integer port = portPool.allocatePort();
+                if(port != null){
+                    String containerId = dockerInstanceManager.startParseInstance(parserConfig.IMAGE_NAME, port);
+                    // 增加容器ID空值校验
+                    if (containerId == null || containerId.isEmpty()) {
+                        log.error("创建实例失败:Docker容器创建失败,端口:{}", port);
+                        portPool.releasePort(port); // 归还端口
+                        continue;
+                    }
+                    InstanceStatus instanceStatus = saveInstanceStatus(containerId, port);
+                } else {
+                    log.error("创建实例失败:无可用端口");
+                    break;
+                }
+            }
+        }
+    }
+
+    /**
+     * 获取有效活跃实例数
+     */
+    private int getEffectiveActiveInstanceNum() {
+        Map<String, InstanceStatus> activeInstancePool = instancestatusRegistry.getActiveInstancePool();
+
+        return (int) activeInstancePool.values().stream()
+                .filter(info -> info.getStatus()==1 || info.getStatus()==0)
+                .count();
+    }
+
+    /**
+     * 判断是否需要拉起实例
+     */
+    private boolean needSpinUpInstance(int currentActiveNum) {
+        // 条件1:活跃数小于最小值
+        boolean isLessThanMin = currentActiveNum < parserConfig.MIN_ACTIVE_INSTANCE;
+        // 条件2:任务积压超过阈值
+        boolean isTaskBacklog = getKafkaTaskBacklog() > parserConfig.TASK_BACKLOG_THRESHOLD;
+
+        return isLessThanMin || isTaskBacklog;
+    }
+
+    /**
+     * 获取Kafka任务积压量
+     */
+    private int getKafkaTaskBacklog() {
+        return 0;
+    }
+
+    /**
+     * 根据GPU负载检测并扩缩容实例
+     */
+    private void checkAndScaleUpByGpuLoad() {
+        Map<String, InstanceStatus> activeInstancePool = instancestatusRegistry.getActiveInstancePool();
+
+        // 计算所有活跃实例的平均GPU负载
+        double avgGpuLoad = calculateAverageGpuLoad(activeInstancePool);
+        log.info("当前平均GPU负载:{}%,阈值:{}%", avgGpuLoad, parserConfig.GPU_LOAD_THRESHOLD);
+
+        // 判断是否需要扩容
+        if (avgGpuLoad < parserConfig.GPU_LOAD_THRESHOLD) {
+            int currentActiveNum = getEffectiveActiveInstanceNum();
+
+            // 检查是否可以扩容
+            if (currentActiveNum < parserConfig.MAX_ACTIVE_INSTANCE) {
+                ScaleUpInstance(currentActiveNum);
+            } else {
+                log.warn("GPU资源充足但已达到最大实例数({}),无法继续扩容", parserConfig.MAX_ACTIVE_INSTANCE);
+            }
+        }
+
+        //判断是否需要缩容
+        if (avgGpuLoad > parserConfig.GPU_LOAD_THRESHOLD) {
+            int currentActiveNum = getEffectiveActiveInstanceNum();
+
+            // 检查是否可以缩容
+            if (currentActiveNum > parserConfig.MIN_ACTIVE_INSTANCE){
+                int needScaleNum = Math.min(parserConfig.GPU_SCALE_INSTANCE_NUM, currentActiveNum - parserConfig.MIN_ACTIVE_INSTANCE);
+                ScaleDownInstance(needScaleNum, activeInstancePool);
+            }
+        }
+    }
+
+    /**
+     * 计算所有活跃实例的平均GPU负载
+     * @param activeInstancePool 活跃实例池
+     * @return 平均GPU负载(百分比)
+     */
+    private double calculateAverageGpuLoad(Map<String, InstanceStatus> activeInstancePool) {
+        return activeInstancePool.values().stream()
+                .filter(status -> status.getStatus() == 1 || status.getStatus() == 0) // 只统计正常状态的实例
+                .filter(status -> status.getGpuUsage() != null) // 过滤掉GPU使用率为null的实例
+                .mapToDouble(InstanceStatus::getGpuUsage)
+                .average()
+                .orElse(0.0);
+    }
+
+    //保存实例状态
+    private InstanceStatus saveInstanceStatus(String containerId,Integer port) {
+        InstanceStatus instanceStatus = new InstanceStatus();
+        instanceStatus.setIp("127.0.0.1");
+        instanceStatus.setPort(port);
+        instanceStatus.setLastHeartbeatTime(System.currentTimeMillis());
+        instanceStatus.setStatus(0);
+
+        Map<String, InstanceStatus> activeInstancePool = instancestatusRegistry.getActiveInstancePool();
+
+        instanceStatus.setContainerId(containerId);
+        activeInstancePool.put(containerId, instanceStatus);
+        return instanceStatus;
+    }
+
+    //增加实例
+    private void ScaleUpInstance(int currentActiveNum){
+        int needCreateNum = parserConfig.GPU_SCALE_INSTANCE_NUM;
+        // 防止超过最大实例数
+        needCreateNum = Math.min(needCreateNum, parserConfig.MAX_ACTIVE_INSTANCE - currentActiveNum);
+
+        log.info("需要扩容{}个解析实例", needCreateNum);
+
+        for (int i = 0; i < needCreateNum; i++) {
+            // 使用PortPool分配端口
+            Integer port = portPool.allocatePort();
+            if(port != null){
+                String containerId = dockerInstanceManager.startParseInstance(parserConfig.IMAGE_NAME, port);
+                // 增加容器ID空值校验
+                if (containerId == null || containerId.isEmpty()) {
+                    log.error("GPU扩容实例失败:Docker容器创建失败,端口:{}", port);
+                    portPool.releasePort(port); // 归还端口
+                    continue;
+                }
+                InstanceStatus instanceStatus = saveInstanceStatus(containerId, port);
+                log.info("基于GPU负载扩容,已创建实例,容器ID:{},端口:{}", containerId, port);
+            } else {
+                log.warn("端口池已满,无法继续GPU扩容");
+                break;
+            }
+        }
+    }
+    /**
+     * 执行缩容操作:优先关闭负载最低的实例
+     * @param needDownNum 需要缩容的实例数
+     * @param activeInstancePool 活跃实例池
+     */
+    private void ScaleDownInstance(int needDownNum, Map<String, InstanceStatus> activeInstancePool) {
+        // 1. 筛选出正常运行的实例,并按GPU负载升序排序(负载最低的优先关闭)
+        List<Map.Entry<String, InstanceStatus>> sortedInstances = activeInstancePool.entrySet().stream()
+                .filter(entry -> entry.getValue().getStatus() == 1 || entry.getValue().getStatus() == 0) // 仅正常实例
+                .filter(entry -> entry.getValue().getGpuUsage() != null) // 有GPU负载数据
+                .sorted(Comparator.comparingDouble(entry -> entry.getValue().getGpuUsage())) // 升序排序
+                .limit(needDownNum) // 只取需要缩容的数量
+                .collect(Collectors.toList());
+
+        // 2. 逐个关闭实例
+        for (Map.Entry<String, InstanceStatus> entry : sortedInstances) {
+            String instanceId = entry.getKey();
+            InstanceStatus instanceStatus = entry.getValue();
+
+            log.info("开始缩容实例:{},GPU负载:{}%,端口:{}",
+                    instanceId, instanceStatus.getGpuUsage(), instanceStatus.getPort());
+
+            try {
+                // 标记实例为失联
+                instanceStatus.setStatus(2);
+                // 从活跃实例池移除
+                activeInstancePool.remove(instanceId);
+                // 释放端口
+                portPool.releasePort(instanceStatus.getPort());
+                // 关闭Docker容器
+                dockerInstanceManager.terminateInstance(instanceStatus.getContainerId());
+
+                log.info("缩容实例{}成功,已关闭容器并释放端口{}", instanceId, instanceStatus.getPort());
+            } catch (Exception e) {
+                log.error("缩容实例{}失败", instanceId, e);
+                // 缩容失败时,将实例重新加入活跃池(避免端口丢失)
+                activeInstancePool.put(instanceId, instanceStatus);
+            }
+        }
+    }
+}

+ 104 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/util/ParseInstanceClient.java

@@ -0,0 +1,104 @@
+package cn.com.yusys.manager.util;
+
+import cn.com.yusys.manager.config.ParserConfig;
+import cn.com.yusys.manager.model.InstanceStatusResponse;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.stereotype.Component;
+import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
+import reactor.core.publisher.Mono;
+
+import javax.annotation.Resource;
+
+/**
+ * 解析服务实例实例接口调用工具
+ */
+@Slf4j
+@Component
+public class ParseInstanceClient {
+
+    @Resource
+    private ParserConfig parserConfig;
+
+    private final WebClient webClient;
+
+    // 通过构造函数注入WebClient.Builder,利用Spring自动配置
+    public ParseInstanceClient(WebClient.Builder webClientBuilder) {
+        this.webClient = webClientBuilder
+                .codecs(config -> config.defaultCodecs().maxInMemorySize(1024 * 1024))
+                .build();
+    }
+
+    /**
+     * 调用Python实例的/status接口获取状态
+     * @param instanceIp 实例IP
+     * @param instancePort 实例端口
+     * @return 状态响应体(包含正常/异常状态)
+     */
+    public InstanceStatusResponse getInstanceStatus(String instanceIp, Integer instancePort) {
+        // 1. 参数校验:返回包含错误信息的响应,而非null
+        if (instanceIp == null || instancePort == null) {
+            log.warn("实例IP或端口为空,跳过状态查询");
+            return createErrorResponse("实例IP或端口为空");
+        }
+
+        String statusUrl = String.format("http://%s:%d%s", instanceIp, instancePort, ParserConfig.STATUS_API);
+        try {
+            // 获取超时时间,增加默认值兜底
+            long timeout = parserConfig != null && parserConfig.STATUS_QUERY_TIMEOUT > 0
+                    ? parserConfig.STATUS_QUERY_TIMEOUT
+                    : 3000; // 默认3秒超时
+
+            InstanceStatusResponse response = webClient.get()
+                    .uri(statusUrl)
+                    .accept(MediaType.APPLICATION_JSON)
+                    .retrieve()
+                    // 处理非200状态码
+                    .onStatus(HttpStatus::isError, clientResponse ->
+                            Mono.error(new WebClientResponseException(
+                                    "状态接口返回异常状态码",
+                                    clientResponse.statusCode().value(),
+                                    clientResponse.statusCode().getReasonPhrase(),
+                                    null, null, null)))
+                    .bodyToMono(InstanceStatusResponse.class)
+                    // 设置超时时间(增加默认值)
+                    .timeout(java.time.Duration.ofMillis(timeout))
+                    .block();
+
+            // 2. 响应校验:返回有效响应或错误响应
+            if (response != null && 200 == response.getCode()) {
+                log.debug("实例{}:{}状态查询成功,状态:{}", instanceIp, instancePort, response.getData().getStatus());
+                return response;
+            } else {
+                log.warn("实例{}:{}状态查询返回失败,响应:{}", instanceIp, instancePort, response);
+                return createErrorResponse("接口返回非200响应");
+            }
+        } catch (WebClientResponseException e) {
+            log.error("实例{}:{}状态查询失败,HTTP状态码:{}", instanceIp, instancePort, e.getRawStatusCode(), e);
+            return createErrorResponse(String.format("HTTP请求失败,状态码:%d", e.getRawStatusCode()));
+        } catch (Exception e) {
+            log.error("实例{}:{}状态查询异常", instanceIp, instancePort, e);
+            return createErrorResponse(e.getMessage());
+        }
+    }
+
+    /**
+     * 创建错误响应对象
+     * @param errorMsg 错误信息
+     * @return 包含错误状态的响应对象
+     */
+    private InstanceStatusResponse createErrorResponse(String errorMsg) {
+        InstanceStatusResponse errorResponse = new InstanceStatusResponse();
+        errorResponse.setCode(500); // 自定义错误码
+        errorResponse.setMsg(errorMsg);
+
+
+        InstanceStatusResponse.InstanceStatusData data = new InstanceStatusResponse.InstanceStatusData();
+        data.setStatus(2); // 标记为离线状态
+        errorResponse.setData(data);
+
+        return errorResponse;
+    }
+}

+ 62 - 0
schedule-manager/src/main/resources/application.yml

@@ -0,0 +1,62 @@
+server:
+  port: 8083
+
+spring:
+  application:
+    name: yusp-kafka-manager
+  kafka:
+    bootstrap-servers: 10.192.72.13:9092
+    consumer:
+      group-id: yusp-topic-group
+      enable-auto-commit: false
+      auto-offset-reset: earliest
+      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+
+# 可选:在配置中定义 Topic 列表,方便管理
+kafka:
+  topics:
+    listen: test-topic
+
+docker:
+  host: tcp://127.0.0.1:2375 # Docker Daemon地址
+  tls-verify: false # 是否开启TLS认证
+  cert-path: /path/to/certs # TLS证书路径(可选)
+
+# 解析服务监控配置
+parser:
+  # 监控参数配置
+  monitor:
+    # 心跳超时阈值(毫秒)30000ms (30秒)
+    heartbeat-timeout: 30000
+
+    # 最小活跃实例数,最小实例池保持数量
+    min-active-instance: 3
+
+    # 最大活跃实例数,资源上限
+    max-active-instance: 10
+
+    # 任务积压阈值,触发临时扩容的 Kafka Lag 阈值 (> 100)
+    task-backlog-threshold: 100
+
+    # 状态接口调用超时(毫秒),
+    status-query-timeout: 5000
+
+    # 状态接口连续失败次数阈值,用于判断实例是否失联
+    status-query-fail-count: 3
+
+    # GPU负载监控配置
+    gpu-load-threshold: 80
+
+    #GPU负载扩容实例数
+    gpu-scale-instance-num: 1
+
+  # Java 管理器配置
+  manager:
+    # Java 管理端的访问地址(用于 Python 实例注册回调)
+    url: http://localhost:8080
+
+  # 实例镜像配置
+  instance:
+    # 解析服务实例的 Docker 镜像名称
+    image: parse-service:latest

+ 41 - 0
schedule-manager/src/test/java/cn/com/yusys/manager/instanceManager/DockerInstanceManagerTest.java

@@ -0,0 +1,41 @@
+package cn.com.yusys.manager.instanceManager;
+
+import cn.com.yusys.manager.instanceManager.Impl.DockerInstanceManager;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+
+@SpringBootTest
+public class DockerInstanceManagerTest {
+
+    // 正确注入DockerInstanceManager(而非底层的DockerClient)
+    @Autowired
+    private DockerInstanceManager dockerInstanceManager;
+
+    @Test
+    public void testStartPythonInstance() {
+        try {
+            // 注意:替换为你实际构建的Python镜像名(如python-parser:final)
+            String imageName = "parse-service";
+            int port = 8081; // 宿主机映射端口
+
+            // 启动Python容器
+            String containerId = dockerInstanceManager.startParseInstance(imageName, port);
+            System.out.println("启动的容器ID:" + containerId);
+
+            // 模拟业务执行后,停止并删除容器
+            // dockerInstanceManager.terminateInstance(containerId);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+    @Test
+    public void testStopInstance() {
+        try {
+            String containerId = "1ab258aaafb5f6202b05b8ffb20d5a376b0fefc9d5acae9da885339009f76fb4";
+            dockerInstanceManager.terminateInstance(containerId);
+        } catch (Exception e) {
+            e.printStackTrace();
+            }
+    }
+}

+ 57 - 0
schedule-manager/src/test/java/cn/com/yusys/manager/util/ParseInstanceClientTest.java

@@ -0,0 +1,57 @@
+package cn.com.yusys.manager.util;
+
+import cn.com.yusys.manager.config.ParserConfig;
+import cn.com.yusys.manager.model.InstanceStatusResponse;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.springframework.boot.test.context.SpringBootTest;
+
+import javax.annotation.Resource;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.Mockito.when;
+
+/**
+ * ParseInstanceClient 的单元测试
+ */
+@SpringBootTest
+class ParseInstanceClientTest {
+
+
+    @Resource
+    private ParseInstanceClient parseInstanceClient;
+
+
+    @Test
+    void testInstanceStatus() {
+        // 1. 准备测试数据
+        String testIp = "127.0.0.1";
+
+        // 3. 执行测试
+        InstanceStatusResponse response = parseInstanceClient.getInstanceStatus(testIp, 1030);
+
+        // 4. 验证结果
+        assertNotNull(response);
+        assertEquals(200, response.getCode());
+        assertNotNull(response.getData());
+        assertEquals(0, response.getData().getStatus());
+    }
+
+
+
+
+    @Test
+    void getInstanceStatus_NullInput_ReturnsNull() {
+        // 1. 测试空 IP
+        InstanceStatusResponse response1 = parseInstanceClient.getInstanceStatus(null, 8080);
+        assertNull(response1);
+
+        // 2. 测试空端口
+        InstanceStatusResponse response2 = parseInstanceClient.getInstanceStatus("127.0.0.1", null);
+        assertNull(response2);
+    }
+
+}

+ 59 - 0
schedule-producer/pom.xml

@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.example</groupId>
+        <artifactId>four-level-schedule</artifactId>
+        <version>1.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>schedule-producer</artifactId>
+    <packaging>jar</packaging>
+    <name>Schedule Producer</name>
+
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <finalName>schedule-producer</finalName>
+        <plugins>
+            <plugin>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>repackage</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>

+ 11 - 0
schedule-producer/src/main/java/cn/com/yusys/producer/ProducerApplication.java

@@ -0,0 +1,11 @@
+package cn.com.yusys.producer;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class ProducerApplication {
+    public static void main(String[] args) {
+        SpringApplication.run(ProducerApplication.class, args);
+    }
+}

+ 37 - 0
schedule-producer/src/main/java/cn/com/yusys/producer/config/KafkaProducerConfig.java

@@ -0,0 +1,37 @@
+package cn.com.yusys.producer.config;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Configuration
+public class KafkaProducerConfig {
+
+    @Value("${spring.kafka.bootstrap-servers}")
+    private String bootstrapServers;
+
+    @Bean
+    public ProducerFactory<String, String> producerFactory() {
+        Map<String, Object> configProps = new HashMap<>();
+        System.out.println(bootstrapServers);
+        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        configProps.put(ProducerConfig.ACKS_CONFIG, "all");
+        configProps.put(ProducerConfig.RETRIES_CONFIG, 3);
+        return new DefaultKafkaProducerFactory<>(configProps);
+    }
+
+    @Bean
+    public KafkaTemplate<String, String> kafkaTemplate() {
+        return new KafkaTemplate<>(producerFactory());
+    }
+}

+ 58 - 0
schedule-producer/src/main/java/cn/com/yusys/producer/controller/ProducerController.java

@@ -0,0 +1,58 @@
+package cn.com.yusys.producer.controller;
+
+import cn.com.yusys.producer.model.Message;
+import cn.com.yusys.producer.service.MessageSender;
+import org.springframework.web.bind.annotation.*;
+
+import javax.annotation.Resource;
+import java.util.HashMap;
+import java.util.Map;
+
+@RestController
+@RequestMapping("/api")
+public class ProducerController {
+
+    @Resource
+    private MessageSender messageSender;
+
+    /**
+     * 通用发送接口
+     */
+    @PostMapping("/send")
+    public Map<String, Object> sendMsg(@RequestBody Message message) {
+
+        String topic = message.getTopic();
+        String key = message.getKey();
+        String msg = message.getMessage();
+
+        // 简单校验 Topic 名称
+        if (topic == null || topic.trim().isEmpty()) {
+            throw new IllegalArgumentException("Topic name cannot be empty");
+        }
+
+        messageSender.send(topic, key, msg);
+
+        Map<String, Object> res = new HashMap<>();
+        res.put("status", "success");
+        res.put("topic", topic);
+        res.put("msg", "Message queued");
+        return res;
+    }
+
+    /**
+     * 批量测试接口:一次性向多个 Topic 发送消息
+     */
+    @PostMapping("/send-batch")
+    public Map<String, Object> sendBatch(@RequestParam(defaultValue = "batch-msg") String msg) {
+        String[] topics = {"yusp-topic-A", "yusp-topic-B", "yusp-topic-C"};
+
+        for (String topic : topics) {
+            messageSender.send(topic, "batch-key", msg + "-for-" + topic);
+        }
+
+        Map<String, Object> res = new HashMap<>();
+        res.put("status", "success");
+        res.put("msg", "Messages sent to multiple topics: " + String.join(", ", topics));
+        return res;
+    }
+}

+ 38 - 0
schedule-producer/src/main/java/cn/com/yusys/producer/model/Message.java

@@ -0,0 +1,38 @@
+package cn.com.yusys.producer.model;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import lombok.Data;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Kafka 消息发送请求参数对象
+ * 对应前端/调用方传入的 JSON 结构
+ */
+@Data
+public class Message  {
+
+
+    /**
+     * 目标 Topic 名称 (必填)
+     * 支持动态指定,如 "yusp-topic-A"
+     */
+    private String topic;
+
+    /**
+     * 消息 Key (可选)
+     * 用于分区策略,如果为 null,Kafka 会使用轮询或随机策略
+     */
+    private String key;
+
+    /**
+     * 消息体内容 (必填)
+     * 可以是纯文本,也可以是 JSON 字符串
+     */
+    private String message;
+
+
+
+}

+ 46 - 0
schedule-producer/src/main/java/cn/com/yusys/producer/service/MessageSender.java

@@ -0,0 +1,46 @@
+package cn.com.yusys.producer.service;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.stereotype.Service;
+import org.springframework.util.concurrent.ListenableFuture;
+import org.springframework.util.concurrent.ListenableFutureCallback;
+
+import javax.annotation.Resource;
+
+@Service
+public class MessageSender {
+    private static final Logger log = LoggerFactory.getLogger(MessageSender.class);
+
+    @Resource
+    private KafkaTemplate<String, String> kafkaTemplate;
+
+    /**
+     * 向指定 Topic 发送消息
+     * @param topic 目标 Topic 名称
+     * @param key 消息键
+     * @param message 消息内容
+     */
+    public void send(String topic, String key, String message) {
+        log.info("Sending message to Topic [{}]: Key={}, Msg={}", topic, key, message);
+
+        // send 方法第一个参数即为 Topic
+        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, key, message);
+
+        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
+            @Override
+            public void onSuccess(SendResult<String, String> result) {
+                log.info("Sent successfully to Topic [{}]. Offset: {}",
+                        result.getRecordMetadata().topic(),
+                        result.getRecordMetadata().offset());
+            }
+
+            @Override
+            public void onFailure(Throwable ex) {
+                log.error("Send failed to Topic [{}]", topic, ex);
+            }
+        });
+    }
+}

+ 18 - 0
schedule-producer/src/main/resources/application.yml

@@ -0,0 +1,18 @@
+server:
+  port: 8081
+
+spring:
+  application:
+    name: schedule-producer
+  kafka:
+    bootstrap-servers: 10.192.72.13:9092
+    producer:
+      key-serializer: org.apache.kafka.common.serialization.StringSerializer
+      value-serializer: org.apache.kafka.common.serialization.StringSerializer
+      acks: all
+      retries: 3
+    topics:
+      test-topic:
+        partitions: 6
+
+