Browse Source

增加解析服务管理模块

zsh 3 weeks ago
parent
commit
66f062c39a
22 changed files with 1317 additions and 171 deletions
  1. 2 0
      .idea/encodings.xml
  2. 8 0
      .idea/inspectionProfiles/Project_Default.xml
  3. 6 0
      .idea/vcs.xml
  4. 141 170
      README.md
  5. 1 0
      pom.xml
  6. BIN
      schedule-manager/doc/解析服务管理流程图.png
  7. 95 0
      schedule-manager/pom.xml
  8. 13 0
      schedule-manager/src/main/java/cn/com/yusys/manager/ManagerApplication.java
  9. 21 0
      schedule-manager/src/main/java/cn/com/yusys/manager/common/ParseInstanceStatusRegistry.java
  10. 116 0
      schedule-manager/src/main/java/cn/com/yusys/manager/common/PortPool.java
  11. 28 0
      schedule-manager/src/main/java/cn/com/yusys/manager/config/DockerConfig.java
  12. 53 0
      schedule-manager/src/main/java/cn/com/yusys/manager/config/ParserConfig.java
  13. 93 0
      schedule-manager/src/main/java/cn/com/yusys/manager/instanceManager/Impl/DockerInstanceManager.java
  14. 7 0
      schedule-manager/src/main/java/cn/com/yusys/manager/instanceManager/InstanceManager.java
  15. 42 0
      schedule-manager/src/main/java/cn/com/yusys/manager/model/InstanceStatus.java
  16. 36 0
      schedule-manager/src/main/java/cn/com/yusys/manager/model/InstanceStatusResponse.java
  17. 390 0
      schedule-manager/src/main/java/cn/com/yusys/manager/service/InstanceMonitorService.java
  18. 104 0
      schedule-manager/src/main/java/cn/com/yusys/manager/util/ParseInstanceClient.java
  19. 62 0
      schedule-manager/src/main/resources/application.yml
  20. 41 0
      schedule-manager/src/test/java/cn/com/yusys/manager/instanceManager/DockerInstanceManagerTest.java
  21. 57 0
      schedule-manager/src/test/java/cn/com/yusys/manager/util/ParseInstanceClientTest.java
  22. 1 1
      schedule-producer/src/main/resources/application.yml

+ 2 - 0
.idea/encodings.xml

@@ -3,6 +3,8 @@
   <component name="Encoding">
     <file url="file://$PROJECT_DIR$/schedule-consumer/src/main/java" charset="UTF-8" />
     <file url="file://$PROJECT_DIR$/schedule-consumer/src/main/resources" charset="UTF-8" />
+    <file url="file://$PROJECT_DIR$/schedule-manager/src/main/java" charset="UTF-8" />
+    <file url="file://$PROJECT_DIR$/schedule-manager/src/main/resources" charset="UTF-8" />
     <file url="file://$PROJECT_DIR$/schedule-producer/src/main/java" charset="UTF-8" />
     <file url="file://$PROJECT_DIR$/schedule-producer/src/main/resources" charset="UTF-8" />
     <file url="file://$PROJECT_DIR$/src/main/java" charset="UTF-8" />

+ 8 - 0
.idea/inspectionProfiles/Project_Default.xml

@@ -0,0 +1,8 @@
+<component name="InspectionProjectProfileManager">
+  <profile version="1.0">
+    <option name="myName" value="Project Default" />
+    <inspection_tool class="AutoCloseableResource" enabled="true" level="WARNING" enabled_by_default="true">
+      <option name="METHOD_MATCHER_CONFIG" value="java.util.Formatter,format,java.io.Writer,append,com.google.common.base.Preconditions,checkNotNull,org.hibernate.Session,close,java.io.PrintWriter,printf,java.io.PrintStream,printf,java.lang.foreign.Arena,ofAuto,java.lang.foreign.Arena,global,cn.com.yusys.manager.parser.DockerInstanceManager,dockerClient" />
+    </inspection_tool>
+  </profile>
+</component>

+ 6 - 0
.idea/vcs.xml

@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="VcsDirectoryMappings">
+    <mapping directory="$PROJECT_DIR$" vcs="Git" />
+  </component>
+</project>

+ 141 - 170
README.md

@@ -1,237 +1,208 @@
-# 四级调度系统 (Four-Level Schedule System)
+# 四级调度系统 (Four-Level Schedule)
 
-基于Spring Boot和Kafka的分布式消息调度系统,实现了生产者-消费者模式的消息传递与处理。
+## 项目简介
 
-## 项目概述
+四级调度系统是一个基于Spring Boot和Kafka的分布式任务调度系统,通过Producer-Consumer模式实现任务的生产与消费,并配备Manager模块进行实例管理和动态扩缩容。系统支持Docker容器化部署,能够根据任务积压情况和资源负载自动调整实例数量,实现高效的资源利用和任务处理。
 
-本项目是一个基于Kafka消息队列的分布式系统,包含两个核心模块:
-- **schedule-producer**: 消息生产者模块,负责生成和发送消息到Kafka主题
-- **schedule-consumer**: 消息消费者模块,负责监听和处理Kafka主题中的消息
+## 系统架构
 
-系统支持多主题消息传递、消息重试机制、死信队列处理等功能,适用于需要高可靠性和可扩展性的消息处理场景。
+系统由三个核心模块组成:
 
-## 技术栈
+1. **schedule-producer**: 任务生产者模块,负责创建和发送任务到Kafka消息队列
+2. **schedule-consumer**: 任务消费者模块,负责从Kafka消费并处理任务
+3. **schedule-manager**: 管理模块,负责监控实例状态、管理Docker容器和动态扩缩容
 
-- **Java**: 1.8
-- **Spring Boot**: 2.7.18
-- **Spring Kafka**: 2.8.11
-- **Kafka**: Apache Kafka
-- **构建工具**: Maven
-- **其他依赖**:
-  - FastJSON 1.2.83
-  - Lombok 1.18.30
+## 技术栈
 
-## 项目结构
-
-```
-four-level-schedule/
-├── pom.xml                          # 父级POM文件
-├── schedule-producer/               # 生产者模块
-│   ├── pom.xml
-│   └── src/main/
-│       ├── java/cn/com/yusys/producer/
-│       │   ├── ProducerApplication.java
-│       │   ├── config/
-│       │   │   └── KafkaProducerConfig.java
-│       │   ├── controller/
-│       │   │   └── ProducerController.java
-│       │   └── service/
-│       │       └── MessageSender.java
-│       └── resources/
-│           └── application.yml
-└── schedule-consumer/               # 消费者模块
-    ├── pom.xml
-    └── src/main/
-        ├── java/cn/com/yusys/consumer/
-        │   ├── ConsumerApplication.java
-        │   ├── config/
-        │   │   └── KafkaConsumerConfig.java
-        │   └── listener/
-        │       └── MessageListener.java
-        └── resources/
-            └── application.yml
-```
+- Java 8
+- Spring Boot 2.7.18
+- Spring Kafka 2.8.11
+- Docker (通过Docker Java API)
+- Fastjson 1.2.83
+- Lombok 1.18.30
 
 ## 模块说明
 
-### 1. schedule-producer (生产者模块)
-
-生产者模块提供RESTful API接口,用于向Kafka主题发送消息。
+### schedule-producer
 
-**主要功能**:
-- 支持向指定主题发送单条消息
-- 支持批量向多个主题发送消息
-- 消息发送回调处理(成功/失败日志记录)
-- 消息键值对支持
+任务生产者模块,提供RESTful API用于创建和发送任务。
 
-**API接口**:
-- `GET /api/send`: 向指定主题发送单条消息
-  - 参数: topic(主题名), msg(消息内容), key(消息键,可选)
-  - 返回: 发送状态和消息信息
+**主要功能:**
+- 接收任务请求
+- 将任务序列化并发送到Kafka主题
+- 支持任务重试机制
 
-- `GET /api/send-batch`: 批量向多个主题发送消息
-  - 参数: msg(消息内容,默认为"batch-msg")
-  - 返回: 批量发送状态
-
-**配置说明**:
+**配置说明:**
 - 服务端口: 8081
-- Kafka服务器: localhost:9092
-- 生产者配置: acks=all, retries=3
+- Kafka服务器: 10.192.72.13:9092
+- 默认主题: test-topic (6个分区)
 
-### 2. schedule-consumer (消费者模块)
+### schedule-consumer
 
-消费者模块监听Kafka主题并处理消息,具备错误处理和重试机制
+任务消费者模块,监听Kafka主题并消费任务。
 
-**主要功能**:
-- 多主题监听(yusp-topic-A, yusp-topic-B, yusp-topic-C)
-- 手动提交偏移量
-- 消息重试机制(最多重试3次)
-- 死信队列处理(重试失败后发送到死信主题)
-- 业务异常模拟(当消息内容为"error"时抛出异常)
+**主要功能:**
+- 监听Kafka主题
+- 消费并处理任务
+- 支持手动提交offset
 
-**配置说明**:
+**配置说明:**
 - 服务端口: 8082
-- Kafka服务器: localhost:9092
-- 消费者组: yusp-multi-topic-group
-- 自动提交: false(手动提交)
-- 偏移重置策略: earliest
-- 并发消费数: 3
+- Kafka消费者组: yusp-topic-group
+- 监听主题: test-topic
 
-## 快速开始
+### schedule-manager
 
-### 前置条件
+管理模块,负责监控和管理解析服务实例。
 
-1. 安装JDK 1.8或更高版本
-2. 安装Maven 3.6或更高版本
-3. 安装并启动Apache Kafka(默认配置: localhost:9092)
+**主要功能:**
+- 监控实例健康状态
+- 管理Docker容器生命周期
+- 根据任务积压和资源负载动态扩缩容
+- GPU负载监控
 
+**配置说明:**
+- 服务端口: 8083
+- Docker守护进程地址: tcp://127.0.0.1:2375
+- 心跳超时阈值: 30000ms
+- 最小活跃实例数: 3
+- 最大活跃实例数: 10
+- 任务积压阈值: 100
+- GPU负载阈值: 80%
 
+## 环境要求
 
-### 使用示例
+- JDK 8+
+- Maven 3.6+
+- Docker (用于Manager模块)
+- Kafka 2.8+
 
-1. 发送单条消息:
-```bash
-curl "http://localhost:8081/api/send?topic=yusp-topic-A&msg=Hello%20World&key=test-key"
-```
+## 快速开始
 
-2. 批量发送消息:
-```bash
-curl "http://localhost:8081/api/send-batch?msg=Batch%20Message"
-```
+### 1. 克隆项目
 
-3. 测试错误处理(会触发重试和死信队列):
 ```bash
-curl "http://localhost:8081/api/send?topic=yusp-topic-A&msg=error"
+git clone <repository-url>
+cd four-level-schedule
 ```
 
-## 配置说明
+### 2. 配置Kafka连接
 
-### 生产者配置 (schedule-producer/src/main/resources/application.yml)
+修改各模块的`application.yml`文件,配置Kafka服务器地址:
 
 ```yaml
-server:
-  port: 8081
-
 spring:
-  application:
-    name: schedule-producer
   kafka:
-    bootstrap-servers: localhost:9092
-    producer:
-      key-serializer: org.apache.kafka.common.serialization.StringSerializer
-      value-serializer: org.apache.kafka.common.serialization.StringSerializer
-      acks: all
-      retries: 3
+    bootstrap-servers: <your-kafka-server>:9092
 ```
 
-### 消费者配置 (schedule-consumer/src/main/resources/application.yml)
+### 3. 配置Docker连接
 
-```yaml
-server:
-  port: 8082
+修改`schedule-manager/src/main/resources/application.yml`,配置Docker守护进程地址:
 
-spring:
-  application:
-    name: yusp-kafka-consumer
-  kafka:
-    bootstrap-servers: localhost:9092
-    consumer:
-      group-id: yusp-multi-topic-group
-      enable-auto-commit: false
-      auto-offset-reset: earliest
-      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
-      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
-
-kafka:
-  topics:
-    listen: yusp-topic-A,yusp-topic-B,yusp-topic-C
+```yaml
+docker:
+  host: tcp://<your-docker-host>:2375
 ```
 
-## 核心功能说明
+### 4. 编译项目
 
-### 消息重试机制
+```bash
+mvn clean package
+```
 
-消费者模块实现了消息重试机制,当消息处理失败时:
-1. 自动重试最多3次,每次间隔1秒
-2. 重试失败后,消息被发送到死信主题(原主题名后加.DLT)
-3. 死信主题中的消息需要人工干预或特殊处理
+### 5. 启动服务
 
-### 手动提交偏移量
+分别启动三个模块:
 
-消费者采用手动提交模式,确保消息处理成功后才提交偏移量,避免消息丢失。
+```bash
+# 启动Producer
+java -jar schedule-producer/target/schedule-producer.jar
 
-### 多主题监听
+# 启动Consumer
+java -jar schedule-consumer/target/schedule-consumer.jar
 
-消费者可以同时监听多个主题,根据配置文件中的`kafka.topics.listen`参数动态配置。
+# 启动Manager
+java -jar schedule-manager/target/schedule-manager.jar
+```
 
-## 构建与部署
+## 使用指南
 
-### 构建可执行JAR
+### 发送任务
 
-```bash
-# 构建生产者JAR
-cd schedule-producer
-mvn clean package
+通过Producer模块提供的RESTful API发送任务:
 
-# 构建消费者JAR
-cd schedule-consumer
-mvn clean package
+```bash
+curl -X POST http://localhost:8081/api/task   -H "Content-Type: application/json"   -d '{"taskData": "your task data"}'
 ```
 
-生成的JAR文件位于:
-- 生产者: `schedule-producer/target/schedule-producer.jar`
-- 消费者: `schedule-consumer/target/schedule-consumer.jar`
+### 监控任务
+
+Manager模块会自动监控任务处理情况和实例状态,并根据配置的阈值进行扩缩容。
+
+## 配置说明
+
+### Kafka配置
+
+所有模块都使用相同的Kafka配置,包括bootstrap-servers、序列化器等。可根据实际环境调整。
+
+### Manager监控配置
+
+Manager模块提供了丰富的监控配置选项:
+
+- `heartbeat-timeout`: 心跳超时阈值
+- `min-active-instance`: 最小活跃实例数
+- `max-active-instance`: 最大活跃实例数
+- `task-backlog-threshold`: 任务积压阈值
+- `gpu-load-threshold`: GPU负载阈值
+
+## 项目结构
+
+```
+four-level-schedule/
+├── pom.xml                          # 父POM文件
+├── schedule-producer/               # 任务生产者模块
+│   ├── pom.xml
+│   └── src/
+│       ├── main/
+│       │   ├── java/
+│       │   └── resources/
+│       └── test/
+├── schedule-consumer/               # 任务消费者模块
+│   ├── pom.xml
+│   └── src/
+│       ├── main/
+│       │   ├── java/
+│       │   └── resources/
+│       └── test/
+└── schedule-manager/                # 管理模块
+    ├── pom.xml
+    ├── doc/                         # 文档目录
+    └── src/
+        ├── main/
+        │   ├── java/
+        │   └── resources/
+        └── test/
+```
 
+## 常见问题
 
-## 注意事项
+### 1. Kafka连接失败
 
-1. 确保Kafka服务已正确启动并可访问
-2. 生产环境和测试环境应使用不同的Kafka集群配置
-3. 根据实际需求调整重试次数和间隔时间
-4. 监控死信主题,及时处理失败消息
-5. 在生产环境中,建议配置Kafka的安全认证机制
+检查Kafka服务器地址和端口是否正确,确保Kafka服务正常运行。
 
-## 故障排查
+### 2. Docker连接失败
 
-### 消费者无法连接Kafka
-- 检查Kafka服务是否启动
-- 确认bootstrap-servers配置正确
-- 检查网络连接和防火墙设置
+检查Docker守护进程是否启用,确保Docker API端口(2375)已开放。
 
-### 消息处理失败
-- 查看消费者日志,确认错误原因
-- 检查死信主题中的失败消息
-- 调整业务逻辑或增加重试次数
+### 3. 实例无法自动扩缩容
 
-### 消息丢失
-- 确认消费者配置中`enable-auto-commit`为false
-- 确保生产者配置中`acks`为all
-- 检查Kafka的副本因子配置
+检查Manager模块的监控配置是否合理,确保任务积压和资源负载达到阈值。
 
 ## 许可证
 
-本项目采用内部许可证,仅供学习和研究使用。
+本项目采用内部许可证,未经授权不得用于商业用途
 
 ## 联系方式
 
-如有问题或建议,请联系项目维护团队。
+如有问题或建议,请联系项目维护

+ 1 - 0
pom.xml

@@ -11,6 +11,7 @@
     <modules>
         <module>schedule-producer</module>
         <module>schedule-consumer</module>
+        <module>schedule-manager</module>
     </modules>
 
     <!-- 统一属性管理 -->

BIN
schedule-manager/doc/解析服务管理流程图.png


+ 95 - 0
schedule-manager/pom.xml

@@ -0,0 +1,95 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.example</groupId>
+        <artifactId>four-level-schedule</artifactId>
+        <version>1.0-SNAPSHOT</version>
+    </parent>
+
+    <groupId>cn.com.yusys.manager</groupId>
+    <artifactId>schedule-manager</artifactId>
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+    <packaging>jar</packaging>
+    <name>Schedule Manager</name>
+
+
+    <dependencies>
+        <!-- Web 依赖保留用于 Actuator 或健康检查,虽主要逻辑是监听 -->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+        </dependency>
+        <!-- Docker Java客户端 -->
+        <dependency>
+            <groupId>com.github.docker-java</groupId>
+            <artifactId>docker-java</artifactId>
+            <version>3.3.4</version>
+        </dependency>
+        <dependency>
+            <groupId>com.github.docker-java</groupId>
+            <artifactId>docker-java-transport-httpclient5</artifactId>
+            <version>3.3.4</version>
+        </dependency>
+        <!-- Spring WebFlux(WebClient) -->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-webflux</artifactId>
+        </dependency>
+
+        <!-- SpringBoot测试核心依赖 -->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <!-- Junit5(SpringBoot 2.4+默认集成) -->
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-engine</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <finalName>schedule-manager</finalName>
+        <plugins>
+            <plugin>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>repackage</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>

+ 13 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/ManagerApplication.java

@@ -0,0 +1,13 @@
+package cn.com.yusys.manager;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.scheduling.annotation.EnableScheduling;
+
+@SpringBootApplication
+@EnableScheduling
+public class ManagerApplication {
+    public static void main(String[] args) {
+        SpringApplication.run(ManagerApplication.class, args);
+    }
+}

+ 21 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/common/ParseInstanceStatusRegistry.java

@@ -0,0 +1,21 @@
+package cn.com.yusys.manager.common;
+
+import cn.com.yusys.manager.model.InstanceStatus;
+import org.springframework.stereotype.Component;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * 解析服务实例状态注册表
+ * 用于管理和维护所有解析服务实例的状态信息
+ */
+@Component
+public class ParseInstanceStatusRegistry {
+    //保存解析服务实例状态
+    private final Map<String, InstanceStatus> activeInstancePool = new ConcurrentHashMap<>();
+
+
+    public Map<String, InstanceStatus> getActiveInstancePool() {
+        return activeInstancePool;
+    }
+}

+ 116 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/common/PortPool.java

@@ -0,0 +1,116 @@
+package cn.com.yusys.manager.common;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * 端口池管理工具类
+ * 负责端口的分配、释放、可用性校验
+ */
+@Slf4j
+@Component
+public class PortPool {
+
+    // 可用端口池(key=端口号,value=端口号)
+    private final Map<Integer, Integer> availablePorts = new ConcurrentHashMap<>();
+
+    // 已分配端口池(记录已分配的端口,防止重复分配)
+    private final Map<Integer, String> allocatedPorts = new ConcurrentHashMap<>();
+
+    // 端口配置
+    private int portStart = 1030;
+    private int portEnd = 1050;
+
+    /**
+     * 初始化端口池
+     * 可通过@Value注入配置,这里先保留默认值,实际使用时替换为配置文件读取
+     */
+    @PostConstruct
+    public void initPortPool() {
+        log.info("开始初始化端口池,端口范围:{} - {}", portStart, portEnd);
+        for (int port = portStart; port <= portEnd; port++) {
+                availablePorts.put(port, port);
+        }
+        log.info("端口池初始化完成,可用端口数:{}", availablePorts.size());
+    }
+
+    /**
+     * 原子化分配端口
+     * @return 可用端口号(无可用端口时返回null)
+     */
+    public Integer allocatePort() {
+        // 迭代器原子操作,避免并发冲突
+        Iterator<Integer> portIterator = availablePorts.keySet().iterator();
+        if (portIterator.hasNext()) {
+            Integer port = portIterator.next();
+            // 从可用池移除,加入已分配池
+            portIterator.remove();
+            allocatedPorts.put(port, Thread.currentThread().getName());
+            log.info("成功分配端口:{},剩余可用端口数:{}", port, availablePorts.size());
+            return port;
+        }
+        log.warn("端口池无可用端口,分配失败");
+        return null;
+    }
+
+    /**
+     * 释放端口(归还到端口池)
+     * @param port 要释放的端口号
+     */
+    public void releasePort(Integer port) {
+        if (port == null) {
+            log.warn("释放端口失败:端口号为null");
+            return;
+        }
+        // 从已分配池移除
+        if (allocatedPorts.remove(port) != null) {
+                availablePorts.put(port, port);
+                log.info("成功释放端口:{},当前可用端口数:{}", port, availablePorts.size());
+
+        } else {
+            log.warn("端口{}未被分配,无需释放", port);
+        }
+    }
+
+
+
+    /**
+     * 获取可用端口数
+     */
+    public int getAvailablePortCount() {
+        return availablePorts.size();
+    }
+
+    /**
+     * 获取已分配端口列表
+     */
+    public Set<Integer> getAllocatedPortList() {
+        return allocatedPorts.keySet();
+    }
+
+    /**
+     * 动态扩展端口池
+     * @param newStart 新增端口起始值
+     * @param newEnd 新增端口结束值
+     */
+    public void expandPortPool(int newStart, int newEnd) {
+        log.info("扩展端口池,新增范围:{} - {}", newStart, newEnd);
+        for (int port = newStart; port <= newEnd; port++) {
+            if (!availablePorts.containsKey(port) && !allocatedPorts.containsKey(port)
+                    ) {
+                availablePorts.put(port, port);
+            }
+        }
+        log.info("端口池扩展完成,新增可用端口数:{}", availablePorts.size());
+    }
+
+
+}

+ 28 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/config/DockerConfig.java

@@ -0,0 +1,28 @@
+package cn.com.yusys.manager.config;
+
+import com.github.dockerjava.api.DockerClient;
+import com.github.dockerjava.core.DefaultDockerClientConfig;
+import com.github.dockerjava.core.DockerClientBuilder;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+
+
+/**
+ * Docker配置类
+ * 用于配置和创建Docker客户端的Bean
+ */
+@Configuration
+public class DockerConfig {
+
+    @Bean
+    public DockerClient dockerClient() {
+        // 自动读取系统环境变量
+        DefaultDockerClientConfig config = DefaultDockerClientConfig.createDefaultConfigBuilder()
+                .withDockerTlsVerify(false)
+                .build();
+
+        // 构建客户端
+        return DockerClientBuilder.getInstance(config).build();
+    }
+}

+ 53 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/config/ParserConfig.java

@@ -0,0 +1,53 @@
+package cn.com.yusys.manager.config;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+/**
+ * 解析服务配置常量
+ */
+@Component
+public class ParserConfig {
+    // 心跳超时阈值(毫秒)
+    @Value("${parser.monitor.heartbeat-timeout}")
+    public long HEARTBEAT_TIMEOUT;
+
+    // 最小活跃实例数
+    @Value("${parser.monitor.min-active-instance}")
+    public int MIN_ACTIVE_INSTANCE;
+
+    // 最大活跃实例数
+    @Value("${parser.monitor.max-active-instance}")
+    public int MAX_ACTIVE_INSTANCE;
+
+    // 任务积压阈值
+    @Value("${parser.monitor.task-backlog-threshold}")
+    public int TASK_BACKLOG_THRESHOLD;
+
+    // 状态接口调用超时(毫秒)
+    @Value("${parser.monitor.status-query-timeout}")
+    public int STATUS_QUERY_TIMEOUT;
+
+    // 状态接口连续失败次数阈值
+    @Value("${parser.monitor.status-query-fail-count}")
+    public int STATUS_QUERY_FAIL_COUNT;
+
+    // Java管理器地址
+    @Value("${parser.manager.url}")
+    public String MANAGER_URL;
+
+    // Python实例状态接口路径
+    public static final String STATUS_API = "/status";
+
+    //实例镜像名称
+    @Value("${parser.instance.image}")
+    public String IMAGE_NAME;
+
+    // GPU负载阈值(百分比)
+    @Value("${parser.monitor.gpu-load-threshold}")
+    public double GPU_LOAD_THRESHOLD;
+
+    // GPU负载扩容实例数
+    @Value("${parser.monitor.gpu-scale-instance-num}")
+    public int GPU_SCALE_INSTANCE_NUM;
+}

+ 93 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/instanceManager/Impl/DockerInstanceManager.java

@@ -0,0 +1,93 @@
+package cn.com.yusys.manager.instanceManager.Impl;
+
+import cn.com.yusys.manager.common.ParseInstanceStatusRegistry;
+import cn.com.yusys.manager.instanceManager.InstanceManager;
+import com.github.dockerjava.api.DockerClient;
+import com.github.dockerjava.api.command.CreateContainerResponse;
+import com.github.dockerjava.api.command.StopContainerCmd;
+import com.github.dockerjava.api.model.HostConfig;
+import com.github.dockerjava.api.model.ExposedPort;
+import com.github.dockerjava.api.model.PortBinding;
+import com.github.dockerjava.api.model.Ports;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+
+@Component
+public class DockerInstanceManager  implements InstanceManager {
+
+    @Value("${docker.host:tcp://127.0.0.1:2375}")
+    private String dockerHost;
+
+    private static final Logger log = LoggerFactory.getLogger(DockerInstanceManager.class);
+
+    @Resource
+    private ParseInstanceStatusRegistry instanceStateRegistry ;
+
+    private final DockerClient client;
+
+    public DockerInstanceManager(DockerClient dockerClient) {
+        this.client = dockerClient;
+    }
+    /**
+     * 拉起解析服务实例
+     * @param imageName Python解析服务的镜像名
+     * @param port 宿主机映射端口(避免端口冲突)
+     * @return 容器ID(用于后续终结实例)
+     */
+    public String startParseInstance(String imageName, int port) {
+        try {
+
+            HostConfig hostConfig = HostConfig.newHostConfig()
+                    .withPortBindings(new PortBinding(
+                            Ports.Binding.bindPort(port),
+                            new ExposedPort(8000) // Python服务内部端口
+                    ))
+                    .withMemory(2048 * 1024 * 1024L) // 限制内存2G
+                    .withCpuCount(2L); // 限制CPU核心数2
+
+            // 创建容器
+            CreateContainerResponse container = client.createContainerCmd(imageName)
+                    .withHostConfig(hostConfig)
+                    .withExposedPorts(new ExposedPort(8000))
+                    .withCmd("python", "parse_service.py", "--host", "0.0.0.0", "--port", "8000")
+                    .withName("python-parser-" + port) // 容器命名,便于识别
+                    .exec();
+
+            // 启动容器
+            client.startContainerCmd(container.getId()).exec();
+
+            String containerId = container.getId();
+            log.info("Python实例启动成功,容器ID:{},映射端口:{}", containerId, port);
+            return containerId;
+        } catch (Exception e) {
+            log.error("Python实例启动失败:{}", e.getMessage());
+            throw new RuntimeException("拉起Python实例失败:" + e.getMessage());
+        }
+    }
+
+    public void terminateInstance(String containerId) {
+        try {
+            // 1. 尝试优雅停止 (等待 10 秒)
+            StopContainerCmd stopCmd = client.stopContainerCmd(containerId);
+            stopCmd.withTimeout(20);
+            stopCmd.exec();
+
+            log.info("实例已停止: {}", containerId);
+
+            // 2. 移除容器 (释放资源)
+            client.removeContainerCmd(containerId)
+                    .withForce(true) // 如果还在运行则强制移除
+                    .withRemoveVolumes(true) // 清理匿名卷
+                    .exec();
+
+            log.info("实例容器已移除: {}", containerId);
+        } catch (Exception e) {
+            log.info("终结实例失败: {}", e.getMessage());
+        }
+    }
+
+}

+ 7 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/instanceManager/InstanceManager.java

@@ -0,0 +1,7 @@
+package cn.com.yusys.manager.instanceManager;
+
+// 实例管理器抽象接口
+public interface InstanceManager {
+    String startParseInstance(String imageName, int port) ;
+    void terminateInstance(String containerId) ;
+}

+ 42 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/model/InstanceStatus.java

@@ -0,0 +1,42 @@
+package cn.com.yusys.manager.model;
+
+
+import lombok.Data;
+
+@Data
+public class InstanceStatus {
+
+    //实例容器id
+    private String containerId;
+
+    //实例名称
+    private String instanceName;
+
+    //实例ip
+    private String ip;
+
+    //实例端口
+    private Integer port;
+
+    //实例状态 0-空闲 1-运行中 2-异常
+    private Integer status;
+
+    //实例cpu占用
+    private Double cpuUsage;
+
+    //实例内存占用
+    private Double memoryUsage;
+
+    //实例gpu占用
+    private Double gpuUsage;
+
+    //实例gpu内存占用
+    private Double gpuMemory;
+
+    //上次心跳时间
+    private Long lastHeartbeatTime;
+
+    //查询失败次数
+    private Integer statusQueryFailCount;
+
+}

+ 36 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/model/InstanceStatusResponse.java

@@ -0,0 +1,36 @@
+package cn.com.yusys.manager.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.Data;
+
+/**
+ * 解析服务实例状态响应实体
+ */
+@Data
+public class InstanceStatusResponse {
+    private Integer code;
+    private String msg;
+    private InstanceStatusData data;
+
+    /**
+     * 响应数据体
+     */
+    @Data
+    public static class InstanceStatusData {
+
+        private Integer status;
+
+        @JsonProperty("cpu_usage")
+        private Double cpuUsage;
+
+        @JsonProperty("gpu_usage")
+        private Double gpuUsage;
+
+        @JsonProperty("memory_usage")
+        private Double memoryUsage;
+
+        @JsonProperty("gpu_memory")
+        private Double gpuMemory;
+    }
+
+}

+ 390 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/service/InstanceMonitorService.java

@@ -0,0 +1,390 @@
+package cn.com.yusys.manager.service;
+
+import cn.com.yusys.manager.common.ParseInstanceStatusRegistry;
+import cn.com.yusys.manager.config.ParserConfig;
+import cn.com.yusys.manager.model.InstanceStatus;
+import cn.com.yusys.manager.model.InstanceStatusResponse;
+import cn.com.yusys.manager.instanceManager.Impl.DockerInstanceManager;
+import cn.com.yusys.manager.util.ParseInstanceClient;
+import cn.com.yusys.manager.common.PortPool; 
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * 解析实例监控核心服务
+ * 该服务负责管理解析实例的生命周期,包括实例的创建、状态监控、心跳检测和负载管理
+ */
+@Slf4j
+@Service
+@RequiredArgsConstructor
+public class InstanceMonitorService {
+
+    // 活跃实例池
+    @Resource
+    private ParseInstanceStatusRegistry instancestatusRegistry;
+
+    // 实例接口调用客户端
+    @Resource
+    private final ParseInstanceClient instanceClient;
+
+    // 解析服务配置
+    @Resource
+    private final ParserConfig parserConfig;
+
+    //docker实例管理器
+    @Resource
+    private DockerInstanceManager dockerInstanceManager;
+
+    // 注入独立的端口池管理工具类
+    @Resource
+    private PortPool portPool;
+
+    @PostConstruct
+    public void initParseInstance(){
+        log.info("开始初始化解析实例...");
+        //启动时初始化实例
+        for (int i = 0; i < parserConfig.MIN_ACTIVE_INSTANCE; i++) {
+            // 4. 使用PortPool分配端口
+            Integer port = portPool.allocatePort();
+            if(port != null){
+                String containerId = dockerInstanceManager.startParseInstance(parserConfig.IMAGE_NAME, port);
+                // 增加容器ID空值校验
+                if (containerId == null || containerId.isEmpty()) {
+                    log.error("初始化实例失败:Docker容器创建失败,端口:{}", port);
+                    portPool.releasePort(port); // 归还端口
+                    continue;
+                }
+                InstanceStatus instanceStatus = saveInstanceStatus(containerId, port);
+            } else {
+                log.error("初始化实例失败:无可用端口");
+                break;
+            }
+        }
+        log.info("解析实例初始化完成,等待30秒后启动监控任务");
+    }
+
+    /**
+     * 核心监控定时任务:
+     * - initialDelay = 30000:首次执行延迟30秒
+     * - fixedRate = 5000:之后每5秒执行一次
+     */
+    @Scheduled(initialDelay = 30000, fixedRate = 5000)
+    public void parserInstanceMonitor() {
+        Map<String, InstanceStatus> activeInstancePool = instancestatusRegistry.getActiveInstancePool();
+        try {
+            log.info("执行解析实例监控,当前活跃实例数:{}", activeInstancePool.size());
+
+            // 1. 主动调用/status接口更新实例状态
+            updateInstanceStatusByApi();
+
+            // 2. 心跳超时检测
+            checkHeartbeatTimeout();
+
+            // 3. 校验活跃实例数,触发实例拉起
+            checkAndSpinUpInstance();
+
+            // 4. 根据GPU负载动态扩缩容
+            checkAndScaleUpByGpuLoad();
+
+        } catch (Exception e) {
+            log.error("解析实例监控任务执行失败", e);
+        }
+    }
+
+    /**
+     * 心跳超时检测:超过阈值停止实例
+     */
+    private void checkHeartbeatTimeout() {
+        long now = System.currentTimeMillis();
+        Map<String, InstanceStatus> activeInstancePool = instancestatusRegistry.getActiveInstancePool();
+        // 复制为新集合,避免遍历中修改原集合
+        Map<String, InstanceStatus> copyPool = new HashMap<>(activeInstancePool);
+
+        copyPool.forEach((instanceId, state) -> {
+            if (state.getLastHeartbeatTime() == null || now - state.getLastHeartbeatTime() > parserConfig.HEARTBEAT_TIMEOUT) {
+                log.warn("实例{}心跳超时,标记为失联", instanceId);
+                state.setStatus(2);
+                // 原子操作移除实例
+                activeInstancePool.remove(instanceId);
+                // 使用PortPool释放端口
+                portPool.releasePort(state.getPort());
+                // 捕获Docker操作异常
+                try {
+                    dockerInstanceManager.terminateInstance(state.getContainerId());
+                } catch (Exception e) {
+                    log.error("终止实例{}的Docker容器失败", instanceId, e);
+                }
+            }
+        });
+    }
+
+
+    /**
+     * 主动调用实例状态接口更新实例状态
+     */
+    private void updateInstanceStatusByApi() {
+        Map<String, InstanceStatus> activeInstancePool = instancestatusRegistry.getActiveInstancePool();
+        activeInstancePool.forEach((instanceId, instancestatus) -> {
+            // 仅处理正常状态的实例
+            if (instancestatus.getStatus()==1|| instancestatus.getStatus()==0) {
+                InstanceStatusResponse response = null;
+                try {
+                    // 增加接口调用异常捕获
+                    response = instanceClient.getInstanceStatus(
+                            instancestatus.getIp(), instancestatus.getPort()
+                    );
+                } catch (Exception e) {
+                    log.error("调用实例{}状态接口失败", instanceId, e);
+                    int failCount = instancestatus.getStatusQueryFailCount() + 1;
+                    instancestatus.setStatusQueryFailCount(failCount);
+                    if (failCount >= parserConfig.STATUS_QUERY_FAIL_COUNT) {
+                        log.warn("实例{}连续{}次状态查询失败,标记为失联", instanceId, parserConfig.STATUS_QUERY_FAIL_COUNT);
+                        instancestatus.setStatus(2);
+                    }
+                    return;
+                }
+
+                if (response != null) {
+                    // 增加data空值校验
+                    InstanceStatusResponse.InstanceStatusData data = response.getData();
+                    if (data == null) {
+                        log.warn("实例{}状态接口返回数据为空", instanceId);
+                        int failCount = instancestatus.getStatusQueryFailCount() + 1;
+                        instancestatus.setStatusQueryFailCount(failCount);
+                        return;
+                    }
+                    // 更新实例状态
+                    instancestatus.setStatus(data.getStatus());
+                    instancestatus.setStatusQueryFailCount(0); // 重置失败次数
+
+                    // 更新负载信息
+                    instancestatus.setCpuUsage(data.getCpuUsage());
+                    instancestatus.setMemoryUsage(data.getMemoryUsage());
+                    instancestatus.setGpuUsage(data.getGpuUsage());
+                    instancestatus.setLastHeartbeatTime(System.currentTimeMillis());
+                    instancestatus.setGpuMemory(data.getGpuMemory());
+                    log.info("实例{}状态查询成功,状态:{}", instanceId, data.getStatus());
+
+                } else {
+                    // 接口调用失败,累计失败次数
+                    int failCount = instancestatus.getStatusQueryFailCount() + 1;
+                    instancestatus.setStatusQueryFailCount(failCount);
+                    log.warn("实例{}状态查询失败,累计失败次数:{}", instanceId, failCount);
+
+                    // 超过失败次数阈值,标记为失联
+                    if (failCount >= parserConfig.STATUS_QUERY_FAIL_COUNT) {
+                        log.warn("实例{}连续{}次状态查询失败,标记为失联", instanceId, parserConfig.STATUS_QUERY_FAIL_COUNT);
+                        instancestatus.setStatus(2);
+                    }
+                }
+            }
+        });
+    }
+
+    /**
+     * 校验活跃实例数,触发实例拉起
+     */
+    private void checkAndSpinUpInstance() {
+        // 获取有效活跃实例数
+        int currentActiveNum = getEffectiveActiveInstanceNum();
+        log.info("当前有效活跃实例数:{},最小要求:{}", currentActiveNum, parserConfig.MIN_ACTIVE_INSTANCE);
+
+        // 判断是否需要拉起实例
+        if (needSpinUpInstance(currentActiveNum)) {
+            int needCreateNum = parserConfig.MIN_ACTIVE_INSTANCE - currentActiveNum;
+            // 防止超过最大实例数
+            needCreateNum = Math.min(needCreateNum, parserConfig.MAX_ACTIVE_INSTANCE - currentActiveNum);
+
+            log.info("需要拉起{}个解析实例", needCreateNum);
+            for (int i = 0; i < needCreateNum; i++) {
+                // 使用PortPool分配端口
+                Integer port = portPool.allocatePort();
+                if(port != null){
+                    String containerId = dockerInstanceManager.startParseInstance(parserConfig.IMAGE_NAME, port);
+                    // 增加容器ID空值校验
+                    if (containerId == null || containerId.isEmpty()) {
+                        log.error("创建实例失败:Docker容器创建失败,端口:{}", port);
+                        portPool.releasePort(port); // 归还端口
+                        continue;
+                    }
+                    InstanceStatus instanceStatus = saveInstanceStatus(containerId, port);
+                } else {
+                    log.error("创建实例失败:无可用端口");
+                    break;
+                }
+            }
+        }
+    }
+
+    /**
+     * 获取有效活跃实例数
+     */
+    private int getEffectiveActiveInstanceNum() {
+        Map<String, InstanceStatus> activeInstancePool = instancestatusRegistry.getActiveInstancePool();
+
+        return (int) activeInstancePool.values().stream()
+                .filter(info -> info.getStatus()==1 || info.getStatus()==0)
+                .count();
+    }
+
+    /**
+     * 判断是否需要拉起实例
+     */
+    private boolean needSpinUpInstance(int currentActiveNum) {
+        // 条件1:活跃数小于最小值
+        boolean isLessThanMin = currentActiveNum < parserConfig.MIN_ACTIVE_INSTANCE;
+        // 条件2:任务积压超过阈值
+        boolean isTaskBacklog = getKafkaTaskBacklog() > parserConfig.TASK_BACKLOG_THRESHOLD;
+
+        return isLessThanMin || isTaskBacklog;
+    }
+
+    /**
+     * 获取Kafka任务积压量
+     */
+    private int getKafkaTaskBacklog() {
+        return 0;
+    }
+
+    /**
+     * 根据GPU负载检测并扩缩容实例
+     */
+    private void checkAndScaleUpByGpuLoad() {
+        Map<String, InstanceStatus> activeInstancePool = instancestatusRegistry.getActiveInstancePool();
+
+        // 计算所有活跃实例的平均GPU负载
+        double avgGpuLoad = calculateAverageGpuLoad(activeInstancePool);
+        log.info("当前平均GPU负载:{}%,阈值:{}%", avgGpuLoad, parserConfig.GPU_LOAD_THRESHOLD);
+
+        // 判断是否需要扩容
+        if (avgGpuLoad < parserConfig.GPU_LOAD_THRESHOLD) {
+            int currentActiveNum = getEffectiveActiveInstanceNum();
+
+            // 检查是否可以扩容
+            if (currentActiveNum < parserConfig.MAX_ACTIVE_INSTANCE) {
+                ScaleUpInstance(currentActiveNum);
+            } else {
+                log.warn("GPU资源充足但已达到最大实例数({}),无法继续扩容", parserConfig.MAX_ACTIVE_INSTANCE);
+            }
+        }
+
+        //判断是否需要缩容
+        if (avgGpuLoad > parserConfig.GPU_LOAD_THRESHOLD) {
+            int currentActiveNum = getEffectiveActiveInstanceNum();
+
+            // 检查是否可以缩容
+            if (currentActiveNum > parserConfig.MIN_ACTIVE_INSTANCE){
+                int needScaleNum = Math.min(parserConfig.GPU_SCALE_INSTANCE_NUM, currentActiveNum - parserConfig.MIN_ACTIVE_INSTANCE);
+                ScaleDownInstance(needScaleNum, activeInstancePool);
+            }
+        }
+    }
+
+    /**
+     * 计算所有活跃实例的平均GPU负载
+     * @param activeInstancePool 活跃实例池
+     * @return 平均GPU负载(百分比)
+     */
+    private double calculateAverageGpuLoad(Map<String, InstanceStatus> activeInstancePool) {
+        return activeInstancePool.values().stream()
+                .filter(status -> status.getStatus() == 1 || status.getStatus() == 0) // 只统计正常状态的实例
+                .filter(status -> status.getGpuUsage() != null) // 过滤掉GPU使用率为null的实例
+                .mapToDouble(InstanceStatus::getGpuUsage)
+                .average()
+                .orElse(0.0);
+    }
+
+    //保存实例状态
+    private InstanceStatus saveInstanceStatus(String containerId,Integer port) {
+        InstanceStatus instanceStatus = new InstanceStatus();
+        instanceStatus.setIp("127.0.0.1");
+        instanceStatus.setPort(port);
+        instanceStatus.setLastHeartbeatTime(System.currentTimeMillis());
+        instanceStatus.setStatus(0);
+
+        Map<String, InstanceStatus> activeInstancePool = instancestatusRegistry.getActiveInstancePool();
+
+        instanceStatus.setContainerId(containerId);
+        activeInstancePool.put(containerId, instanceStatus);
+        return instanceStatus;
+    }
+
+    //增加实例
+    private void ScaleUpInstance(int currentActiveNum){
+        int needCreateNum = parserConfig.GPU_SCALE_INSTANCE_NUM;
+        // 防止超过最大实例数
+        needCreateNum = Math.min(needCreateNum, parserConfig.MAX_ACTIVE_INSTANCE - currentActiveNum);
+
+        log.info("需要扩容{}个解析实例", needCreateNum);
+
+        for (int i = 0; i < needCreateNum; i++) {
+            // 使用PortPool分配端口
+            Integer port = portPool.allocatePort();
+            if(port != null){
+                String containerId = dockerInstanceManager.startParseInstance(parserConfig.IMAGE_NAME, port);
+                // 增加容器ID空值校验
+                if (containerId == null || containerId.isEmpty()) {
+                    log.error("GPU扩容实例失败:Docker容器创建失败,端口:{}", port);
+                    portPool.releasePort(port); // 归还端口
+                    continue;
+                }
+                InstanceStatus instanceStatus = saveInstanceStatus(containerId, port);
+                log.info("基于GPU负载扩容,已创建实例,容器ID:{},端口:{}", containerId, port);
+            } else {
+                log.warn("端口池已满,无法继续GPU扩容");
+                break;
+            }
+        }
+    }
+    /**
+     * 执行缩容操作:优先关闭负载最低的实例
+     * @param needDownNum 需要缩容的实例数
+     * @param activeInstancePool 活跃实例池
+     */
+    private void ScaleDownInstance(int needDownNum, Map<String, InstanceStatus> activeInstancePool) {
+        // 1. 筛选出正常运行的实例,并按GPU负载升序排序(负载最低的优先关闭)
+        List<Map.Entry<String, InstanceStatus>> sortedInstances = activeInstancePool.entrySet().stream()
+                .filter(entry -> entry.getValue().getStatus() == 1 || entry.getValue().getStatus() == 0) // 仅正常实例
+                .filter(entry -> entry.getValue().getGpuUsage() != null) // 有GPU负载数据
+                .sorted(Comparator.comparingDouble(entry -> entry.getValue().getGpuUsage())) // 升序排序
+                .limit(needDownNum) // 只取需要缩容的数量
+                .collect(Collectors.toList());
+
+        // 2. 逐个关闭实例
+        for (Map.Entry<String, InstanceStatus> entry : sortedInstances) {
+            String instanceId = entry.getKey();
+            InstanceStatus instanceStatus = entry.getValue();
+
+            log.info("开始缩容实例:{},GPU负载:{}%,端口:{}",
+                    instanceId, instanceStatus.getGpuUsage(), instanceStatus.getPort());
+
+            try {
+                // 标记实例为失联
+                instanceStatus.setStatus(2);
+                // 从活跃实例池移除
+                activeInstancePool.remove(instanceId);
+                // 释放端口
+                portPool.releasePort(instanceStatus.getPort());
+                // 关闭Docker容器
+                dockerInstanceManager.terminateInstance(instanceStatus.getContainerId());
+
+                log.info("缩容实例{}成功,已关闭容器并释放端口{}", instanceId, instanceStatus.getPort());
+            } catch (Exception e) {
+                log.error("缩容实例{}失败", instanceId, e);
+                // 缩容失败时,将实例重新加入活跃池(避免端口丢失)
+                activeInstancePool.put(instanceId, instanceStatus);
+            }
+        }
+    }
+}

+ 104 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/util/ParseInstanceClient.java

@@ -0,0 +1,104 @@
+package cn.com.yusys.manager.util;
+
+import cn.com.yusys.manager.config.ParserConfig;
+import cn.com.yusys.manager.model.InstanceStatusResponse;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.stereotype.Component;
+import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
+import reactor.core.publisher.Mono;
+
+import javax.annotation.Resource;
+
+/**
+ * 解析服务实例实例接口调用工具
+ */
+@Slf4j
+@Component
+public class ParseInstanceClient {
+
+    @Resource
+    private ParserConfig parserConfig;
+
+    private final WebClient webClient;
+
+    // 通过构造函数注入WebClient.Builder,利用Spring自动配置
+    public ParseInstanceClient(WebClient.Builder webClientBuilder) {
+        this.webClient = webClientBuilder
+                .codecs(config -> config.defaultCodecs().maxInMemorySize(1024 * 1024))
+                .build();
+    }
+
+    /**
+     * 调用Python实例的/status接口获取状态
+     * @param instanceIp 实例IP
+     * @param instancePort 实例端口
+     * @return 状态响应体(包含正常/异常状态)
+     */
+    public InstanceStatusResponse getInstanceStatus(String instanceIp, Integer instancePort) {
+        // 1. 参数校验:返回包含错误信息的响应,而非null
+        if (instanceIp == null || instancePort == null) {
+            log.warn("实例IP或端口为空,跳过状态查询");
+            return createErrorResponse("实例IP或端口为空");
+        }
+
+        String statusUrl = String.format("http://%s:%d%s", instanceIp, instancePort, ParserConfig.STATUS_API);
+        try {
+            // 获取超时时间,增加默认值兜底
+            long timeout = parserConfig != null && parserConfig.STATUS_QUERY_TIMEOUT > 0
+                    ? parserConfig.STATUS_QUERY_TIMEOUT
+                    : 3000; // 默认3秒超时
+
+            InstanceStatusResponse response = webClient.get()
+                    .uri(statusUrl)
+                    .accept(MediaType.APPLICATION_JSON)
+                    .retrieve()
+                    // 处理非200状态码
+                    .onStatus(HttpStatus::isError, clientResponse ->
+                            Mono.error(new WebClientResponseException(
+                                    "状态接口返回异常状态码",
+                                    clientResponse.statusCode().value(),
+                                    clientResponse.statusCode().getReasonPhrase(),
+                                    null, null, null)))
+                    .bodyToMono(InstanceStatusResponse.class)
+                    // 设置超时时间(增加默认值)
+                    .timeout(java.time.Duration.ofMillis(timeout))
+                    .block();
+
+            // 2. 响应校验:返回有效响应或错误响应
+            if (response != null && 200 == response.getCode()) {
+                log.debug("实例{}:{}状态查询成功,状态:{}", instanceIp, instancePort, response.getData().getStatus());
+                return response;
+            } else {
+                log.warn("实例{}:{}状态查询返回失败,响应:{}", instanceIp, instancePort, response);
+                return createErrorResponse("接口返回非200响应");
+            }
+        } catch (WebClientResponseException e) {
+            log.error("实例{}:{}状态查询失败,HTTP状态码:{}", instanceIp, instancePort, e.getRawStatusCode(), e);
+            return createErrorResponse(String.format("HTTP请求失败,状态码:%d", e.getRawStatusCode()));
+        } catch (Exception e) {
+            log.error("实例{}:{}状态查询异常", instanceIp, instancePort, e);
+            return createErrorResponse(e.getMessage());
+        }
+    }
+
+    /**
+     * 创建错误响应对象
+     * @param errorMsg 错误信息
+     * @return 包含错误状态的响应对象
+     */
+    private InstanceStatusResponse createErrorResponse(String errorMsg) {
+        InstanceStatusResponse errorResponse = new InstanceStatusResponse();
+        errorResponse.setCode(500); // 自定义错误码
+        errorResponse.setMsg(errorMsg);
+
+
+        InstanceStatusResponse.InstanceStatusData data = new InstanceStatusResponse.InstanceStatusData();
+        data.setStatus(2); // 标记为离线状态
+        errorResponse.setData(data);
+
+        return errorResponse;
+    }
+}

+ 62 - 0
schedule-manager/src/main/resources/application.yml

@@ -0,0 +1,62 @@
+server:
+  port: 8083
+
+spring:
+  application:
+    name: yusp-kafka-manager
+  kafka:
+    bootstrap-servers: 10.192.72.13:9092
+    consumer:
+      group-id: yusp-topic-group
+      enable-auto-commit: false
+      auto-offset-reset: earliest
+      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+
+# 可选:在配置中定义 Topic 列表,方便管理
+kafka:
+  topics:
+    listen: test-topic
+
+docker:
+  host: tcp://127.0.0.1:2375 # Docker Daemon地址
+  tls-verify: false # 是否开启TLS认证
+  cert-path: /path/to/certs # TLS证书路径(可选)
+
+# 解析服务监控配置
+parser:
+  # 监控参数配置
+  monitor:
+    # 心跳超时阈值(毫秒)30000ms (30秒)
+    heartbeat-timeout: 30000
+
+    # 最小活跃实例数,最小实例池保持数量
+    min-active-instance: 3
+
+    # 最大活跃实例数,资源上限
+    max-active-instance: 10
+
+    # 任务积压阈值,触发临时扩容的 Kafka Lag 阈值 (> 100)
+    task-backlog-threshold: 100
+
+    # 状态接口调用超时(毫秒),
+    status-query-timeout: 5000
+
+    # 状态接口连续失败次数阈值,用于判断实例是否失联
+    status-query-fail-count: 3
+
+    # GPU负载监控配置
+    gpu-load-threshold: 80
+
+    #GPU负载扩容实例数
+    gpu-scale-instance-num: 1
+
+  # Java 管理器配置
+  manager:
+    # Java 管理端的访问地址(用于 Python 实例注册回调)
+    url: http://localhost:8080
+
+  # 实例镜像配置
+  instance:
+    # 解析服务实例的 Docker 镜像名称
+    image: parse-service:latest

+ 41 - 0
schedule-manager/src/test/java/cn/com/yusys/manager/instanceManager/DockerInstanceManagerTest.java

@@ -0,0 +1,41 @@
+package cn.com.yusys.manager.instanceManager;
+
+import cn.com.yusys.manager.instanceManager.Impl.DockerInstanceManager;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+
+@SpringBootTest
+public class DockerInstanceManagerTest {
+
+    // 正确注入DockerInstanceManager(而非底层的DockerClient)
+    @Autowired
+    private DockerInstanceManager dockerInstanceManager;
+
+    @Test
+    public void testStartPythonInstance() {
+        try {
+            // 注意:替换为你实际构建的Python镜像名(如python-parser:final)
+            String imageName = "parse-service";
+            int port = 8081; // 宿主机映射端口
+
+            // 启动Python容器
+            String containerId = dockerInstanceManager.startParseInstance(imageName, port);
+            System.out.println("启动的容器ID:" + containerId);
+
+            // 模拟业务执行后,停止并删除容器
+            // dockerInstanceManager.terminateInstance(containerId);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+    @Test
+    public void testStopInstance() {
+        try {
+            String containerId = "1ab258aaafb5f6202b05b8ffb20d5a376b0fefc9d5acae9da885339009f76fb4";
+            dockerInstanceManager.terminateInstance(containerId);
+        } catch (Exception e) {
+            e.printStackTrace();
+            }
+    }
+}

+ 57 - 0
schedule-manager/src/test/java/cn/com/yusys/manager/util/ParseInstanceClientTest.java

@@ -0,0 +1,57 @@
+package cn.com.yusys.manager.util;
+
+import cn.com.yusys.manager.config.ParserConfig;
+import cn.com.yusys.manager.model.InstanceStatusResponse;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.springframework.boot.test.context.SpringBootTest;
+
+import javax.annotation.Resource;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.Mockito.when;
+
+/**
+ * ParseInstanceClient 的单元测试
+ */
+@SpringBootTest
+class ParseInstanceClientTest {
+
+
+    @Resource
+    private ParseInstanceClient parseInstanceClient;
+
+
+    @Test
+    void testInstanceStatus() {
+        // 1. 准备测试数据
+        String testIp = "127.0.0.1";
+
+        // 3. 执行测试
+        InstanceStatusResponse response = parseInstanceClient.getInstanceStatus(testIp, 1030);
+
+        // 4. 验证结果
+        assertNotNull(response);
+        assertEquals(200, response.getCode());
+        assertNotNull(response.getData());
+        assertEquals(0, response.getData().getStatus());
+    }
+
+
+
+
+    @Test
+    void getInstanceStatus_NullInput_ReturnsNull() {
+        // 1. 测试空 IP
+        InstanceStatusResponse response1 = parseInstanceClient.getInstanceStatus(null, 8080);
+        assertNull(response1);
+
+        // 2. 测试空端口
+        InstanceStatusResponse response2 = parseInstanceClient.getInstanceStatus("127.0.0.1", null);
+        assertNull(response2);
+    }
+
+}

+ 1 - 1
schedule-producer/src/main/resources/application.yml

@@ -13,6 +13,6 @@ spring:
       retries: 3
     topics:
       test-topic:
-        partitions: 3
+        partitions: 6