Quellcode durchsuchen

配置修改

# Conflicts:
#	.idea/encodings.xml
zsh vor 2 Wochen
Ursprung
Commit
a71e9a8cc8
27 geänderte Dateien mit 969 neuen und 219 gelöschten Zeilen
  1. 0 55
      parser/.dockerignore
  2. 0 46
      parser/dockerfile
  3. BIN
      parser/examples/test1.pdf
  4. 27 9
      schedule-consumer/src/main/java/cn/com/yusys/consumer/listener/MessageListener.java
  5. 50 0
      schedule-consumer/src/main/java/cn/com/yusys/consumer/model/TaskRecordRequest.java
  6. 2 2
      schedule-consumer/src/main/java/cn/com/yusys/consumer/util/ParseServiceClient.java
  7. 64 0
      schedule-consumer/src/main/java/cn/com/yusys/consumer/util/TaskRecordClient.java
  8. 4 4
      schedule-consumer/src/main/resources/application.yml
  9. 21 4
      schedule-manager/pom.xml
  10. 2 0
      schedule-manager/src/main/java/cn/com/yusys/manager/ManagerApplication.java
  11. 33 0
      schedule-manager/src/main/java/cn/com/yusys/manager/config/ManagerConfig.java
  12. 0 3
      schedule-manager/src/main/java/cn/com/yusys/manager/config/ParserConfig.java
  13. 1 23
      schedule-manager/src/main/java/cn/com/yusys/manager/controller/ManagerController.java
  14. 68 0
      schedule-manager/src/main/java/cn/com/yusys/manager/controller/TaskController.java
  15. 59 0
      schedule-manager/src/main/java/cn/com/yusys/manager/entity/TaskRecordEntity.java
  16. 57 12
      schedule-manager/src/main/java/cn/com/yusys/manager/instanceManager/Impl/DockerInstanceManager.java
  17. 320 0
      schedule-manager/src/main/java/cn/com/yusys/manager/instanceManager/Impl/ProcessInstanceManager.java
  18. 3 2
      schedule-manager/src/main/java/cn/com/yusys/manager/instanceManager/InstanceManager.java
  19. 50 0
      schedule-manager/src/main/java/cn/com/yusys/manager/model/TaskRecordRequest.java
  20. 12 0
      schedule-manager/src/main/java/cn/com/yusys/manager/repository/TaskRecordMapper.java
  21. 70 41
      schedule-manager/src/main/java/cn/com/yusys/manager/service/InstanceMonitorService.java
  22. 12 0
      schedule-manager/src/main/java/cn/com/yusys/manager/service/TaskRecordService.java
  23. 43 0
      schedule-manager/src/main/java/cn/com/yusys/manager/service/impl/TaskRecordServiceImpl.java
  24. 25 16
      schedule-manager/src/main/resources/application.yml
  25. 1 1
      schedule-manager/src/test/java/cn/com/yusys/manager/instanceManager/DockerInstanceManagerTest.java
  26. 44 0
      schedule-manager/src/test/java/cn/com/yusys/manager/instanceManager/ProcessInstanceManagerTest.java
  27. 1 1
      schedule-producer/src/main/resources/application.yml

+ 0 - 55
parser/.dockerignore

@@ -1,55 +0,0 @@
-# Git相关
-.git
-.gitignore
-.gitattributes
-
-# Python相关
-__pycache__
-*.pyc
-*.pyo
-*.pyd
-.Python
-*.egg-info/
-dist/
-build/
-*.egg
-.pytest_cache/
-.coverage
-htmlcov/
-
-# 虚拟环境
-venv/
-env/
-ENV/
-.venv
-
-# IDE相关
-.vscode/
-.idea/
-*.swp
-*.swo
-*~
-.DS_Store
-
-# 输出文件
-output/
-logs/
-*.log
-
-# 临时文件
-tmp/
-temp/
-*.tmp
-
-# 文档
-*.md
-doc/
-docs/
-
-# 开发工具
-.editorconfig
-.pre-commit-config.yaml
-Makefile
-
-# 不需要的目录
-.github/

+ 0 - 46
parser/dockerfile

@@ -1,46 +0,0 @@
-# 基础镜像:使用Python官方镜像(更稳定)
-FROM python:3.11-slim
-
-# 设置工作目录
-WORKDIR /app
-
-# 环境变量
-ENV PYTHONIOENCODING=utf-8 \
-    PIP_DEFAULT_TIMEOUT=120 \
-    PIP_DISABLE_PIP_VERSION_CHECK=1 \
-    DEBIAN_FRONTEND=noninteractive \
-    PYTHONUNBUFFERED=1
-
-# ========== 系统依赖安装 ==========
-RUN sed -i 's|deb.debian.org|mirrors.ustc.edu.cn|g' /etc/apt/sources.list.d/debian.sources && \
-    apt-get update && \
-    apt-get install -y --no-install-recommends \
-    libglib2.0-0 \
-    libsm6 \
-    libxext6 \
-    libxrender-dev \
-    libgomp1 \
-    libgthread-2.0-0 \
-    libgtk-3-0 \
-    libgstreamer1.0-0 \
-    libgstreamer-plugins-base1.0-0 \
-    ffmpeg \
-    ca-certificates \
-    && rm -rf /var/lib/apt/lists/*
-
-# ========== 配置pip源(国内加速) ==========
-RUN pip config set global.index-url https://mirrors.aliyun.com/pypi/simple/ && \
-    pip config set global.trusted-host mirrors.aliyun.com
-
-# ========== 安装Python依赖(包括所有系统库的Python包装) ==========
-COPY requirements.txt .
-RUN pip install --no-cache-dir -r requirements.txt
-
-# ========== 复制应用代码 ==========
-COPY . .
-
-# ========== 暴露端口 ==========
-EXPOSE 8000
-
-# ========== 启动应用 ==========
-CMD ["python", "-m", "uvicorn", "parse_service:app", "--host", "0.0.0.0", "--port", "8000"]

BIN
parser/examples/test1.pdf


+ 27 - 9
schedule-consumer/src/main/java/cn/com/yusys/consumer/listener/MessageListener.java

@@ -1,7 +1,9 @@
 package cn.com.yusys.consumer.listener;
 
 import cn.com.yusys.consumer.model.Task;
+import cn.com.yusys.consumer.model.TaskRecordRequest;
 import cn.com.yusys.consumer.util.ParseServiceClient;
+import cn.com.yusys.consumer.util.TaskRecordClient;
 import cn.com.yusys.consumer.util.response.ExecuteResponse;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.slf4j.Logger;
@@ -21,6 +23,9 @@ public class MessageListener {
     @Autowired
     private ParseServiceClient parseServiceClient;
 
+    @Autowired
+    private TaskRecordClient taskRecordClient;
+
     // 从配置读取逗号分隔的 Topic 字符串
     @Value("${kafka.topics.listen}")
     private String topicsConfig;
@@ -35,7 +40,7 @@ public class MessageListener {
         log.info("=== [CONSUMER] 收到消息 | Topic: {} | Key: {} | Msg: {} | Offset: {} ===",
                 record.topic(), record.key(), message, record.offset());
 
-        ExecuteResponse executeResponse = processBusinessLogic(message);
+        ExecuteResponse executeResponse = processBusinessLogic(message, record);
 
         if (executeResponse != null && executeResponse.getCode()==200) {
             log.info("=== [CONSUMER] 任务处理成功 ===");
@@ -58,17 +63,30 @@ public class MessageListener {
 
     }
 
-    private ExecuteResponse processBusinessLogic(String message) {
-        if ("error".equals(message)) {
-            throw new RuntimeException("Simulated Business Exception");
-        }
-        
+    private ExecuteResponse processBusinessLogic(String message, ConsumerRecord<?, ?> record) {
+
         try {
+            // 创建任务
+            String taskId = UUID.randomUUID().toString();
+            Task task = new Task();
+            task.setTaskId(taskId);
+            task.setFilePath(message);
+            
+            // 创建任务记录
+            String logPath = "logs/tasks/" + taskId + ".log";
+            TaskRecordRequest taskRecordRequest = new TaskRecordRequest();
+            taskRecordRequest.setTaskId(taskId);
+            taskRecordRequest.setTopic(record.topic());
+            taskRecordRequest.setPartition(record.partition());
+            taskRecordRequest.setOffset(record.offset());
+            taskRecordRequest.setLogPath(logPath);
+            boolean recordCreated = taskRecordClient.createTaskRecord(taskRecordRequest);
+            if (!recordCreated) {
+                log.warn("创建任务记录失败,任务ID:{}", taskId);
+            }
+            
             // 调用ParseServiceClient执行任务
             log.info("开始调用execute接口执行任务,消息内容:{}", message);
-            Task task=new Task();
-            task.setTaskId(UUID.randomUUID().toString());
-            task.setFilePath(message);
             ExecuteResponse response = parseServiceClient.executeTask(task);
             log.info("任务执行完成,响应:{}", response);
             return response;

+ 50 - 0
schedule-consumer/src/main/java/cn/com/yusys/consumer/model/TaskRecordRequest.java

@@ -0,0 +1,50 @@
+package cn.com.yusys.consumer.model;
+
+import lombok.Data;
+
+import java.time.LocalDateTime;
+
+/**
+ * 任务记录请求类,用于封装任务记录的相关信息
+ */
+@Data
+public class TaskRecordRequest {
+
+    /**
+     * 任务ID,用于唯一标识一个任务
+     */
+    private String taskId;
+
+    /**
+     * 消息主题,通常用于消息队列中的主题分类
+     */
+    private String topic;
+
+    /**
+     * 分区号,通常用于消息队列中的分区机制
+     */
+    private Integer partition;
+
+    /**
+     * 偏移量,用于标识消息在分区中的位置
+     */
+    private Long offset;
+
+    /**
+     * 任务状态,用于表示任务的当前执行状态
+     */
+    private Integer status;
+
+    /**
+     * 创建时间,记录任务记录的创建时间
+     * 使用Java 8的LocalDateTime类来精确到纳秒的时间表示
+     */
+    private LocalDateTime createTime;
+
+    /**
+     * 日志路径,用于存储任务执行日志的文件路径
+     */
+    private String logPath;
+
+
+}

+ 2 - 2
schedule-consumer/src/main/java/cn/com/yusys/consumer/util/ParseServiceClient.java

@@ -21,7 +21,7 @@ public class ParseServiceClient {
 
     private final WebClient webClient;
 
-    @Value("${url.execute}:\"http://127.0.0.1:8083//api/manager/parse\"")
+    @Value("${url.execute}:\"http://127.0.0.1:8083//api/task/parse\"")
     private String executeUrl ;
 
     // 通过构造函数注入WebClient.Builder,利用Spring自动配置
@@ -37,7 +37,7 @@ public class ParseServiceClient {
      * @return 执行结果响应
      */
     public ExecuteResponse executeTask(Task taskData) {
-        String executeUrl = "http://127.0.0.1:8083//api/manager/parse";
+        String executeUrl = "http://127.0.0.1:8083//api/task/parse";
         try {
             ExecuteResponse response = webClient.post()
                     .uri(executeUrl)

+ 64 - 0
schedule-consumer/src/main/java/cn/com/yusys/consumer/util/TaskRecordClient.java

@@ -0,0 +1,64 @@
+package cn.com.yusys.consumer.util;
+
+import cn.com.yusys.consumer.model.TaskRecordRequest;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.stereotype.Component;
+import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
+import reactor.core.publisher.Mono;
+
+/**
+ * 任务记录接口调用工具
+ */
+@Slf4j
+@Component
+public class TaskRecordClient {
+
+    private final WebClient webClient;
+
+    public TaskRecordClient(WebClient.Builder webClientBuilder) {
+        this.webClient = webClientBuilder
+                .codecs(config -> config.defaultCodecs().maxInMemorySize(1024 * 1024))
+                .build();
+    }
+
+    /**
+     * 创建任务记录
+     * param taskRecordRequest 任务记录请求对象,包含任务ID、主题、分区、偏移量、状态、创建时间和日志路径等信息
+     * @return 是否成功
+     */
+    public boolean createTaskRecord(TaskRecordRequest taskRecordRequest) {
+        String url = "http://127.0.0.1:8083/api/task/create-record";
+        try {
+            String response = webClient.post()
+                    .uri(url)
+                    .contentType(MediaType.APPLICATION_JSON)
+                    .bodyValue(taskRecordRequest)
+                    .retrieve()
+                    .onStatus(HttpStatus::isError, clientResponse ->
+                            Mono.error(new WebClientResponseException(
+                                    "创建任务记录返回异常状态码",
+                                    clientResponse.statusCode().value(),
+                                    clientResponse.statusCode().getReasonPhrase(),
+                                    null, null, null)))
+                    .bodyToMono(String.class)
+                    .timeout(java.time.Duration.ofSeconds(30))
+                    .block();
+
+            log.debug("创建任务记录成功,任务ID:{}", taskRecordRequest.getTaskId());
+            return true;
+        } catch (WebClientResponseException e) {
+            log.error("创建任务记录失败,HTTP状态码:{}", e.getRawStatusCode(), e);
+            return false;
+        } catch (Exception e) {
+            log.error("创建任务记录异常", e);
+            return false;
+        }
+    }
+
+
+
+}

+ 4 - 4
schedule-consumer/src/main/resources/application.yml

@@ -3,11 +3,11 @@ server:
 
 spring:
   application:
-    name: yusp-kafka-consumer
+    name: schedule-consumer
   kafka:
     bootstrap-servers: 10.192.72.13:9092
     consumer:
-      group-id: yusp-topic-group
+      group-id: schedule-topic-group
       enable-auto-commit: false
       auto-offset-reset: earliest
       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
@@ -16,7 +16,7 @@ spring:
 # 可选:在配置中定义 Topic 列表,方便管理
 kafka:
   topics:
-    listen: schedule-topic
+    listen: task_post_loan_medium
 
 url:
-  execute: "http://127.0.0.1:8083//api/manager/parse"
+  execute: "http://127.0.0.1:8083//api/task/parse"

+ 21 - 4
schedule-manager/pom.xml

@@ -28,10 +28,6 @@
             <artifactId>spring-boot-starter-web</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.springframework.kafka</groupId>
-            <artifactId>spring-kafka</artifactId>
-        </dependency>
-        <dependency>
             <groupId>com.alibaba</groupId>
             <artifactId>fastjson</artifactId>
         </dependency>
@@ -61,6 +57,27 @@
             <artifactId>spring-boot-starter-validation</artifactId>
         </dependency>
 
+        <!-- MyBatis Plus -->
+        <dependency>
+            <groupId>com.baomidou</groupId>
+            <artifactId>mybatis-plus-boot-starter</artifactId>
+            <version>3.5.3.1</version>
+        </dependency>
+
+        <!-- MySQL驱动 -->
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <version>8.0.33</version>
+        </dependency>
+
+        <!-- H2数据库(用于开发测试) -->
+        <dependency>
+            <groupId>com.h2database</groupId>
+            <artifactId>h2</artifactId>
+            <scope>runtime</scope>
+        </dependency>
+
         <!-- SpringBoot测试核心依赖 -->
         <dependency>
             <groupId>org.springframework.boot</groupId>

+ 2 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/ManagerApplication.java

@@ -1,10 +1,12 @@
 package cn.com.yusys.manager;
 
+import org.mybatis.spring.annotation.MapperScan;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.scheduling.annotation.EnableScheduling;
 
 @SpringBootApplication
+@MapperScan("cn.com.yusys.manager.repository")
 @EnableScheduling
 public class ManagerApplication {
     public static void main(String[] args) {

+ 33 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/config/ManagerConfig.java

@@ -0,0 +1,33 @@
+package cn.com.yusys.manager.config;
+
+import cn.com.yusys.manager.instanceManager.Impl.DockerInstanceManager;
+import cn.com.yusys.manager.instanceManager.Impl.ProcessInstanceManager;
+import cn.com.yusys.manager.instanceManager.InstanceManager;
+import com.github.dockerjava.api.DockerClient;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import javax.annotation.Resource;
+import java.util.Objects;
+
+@Configuration
+
+public class ManagerConfig {
+    // 读取 yml 配置的类型
+    @Value("${parser.instance.type}")
+    private String instanceType;
+
+    @Resource
+    private DockerClient dockerClient;
+
+    // 根据配置动态创建 Bean
+    @Bean
+    public InstanceManager instanceManager() {
+        if(Objects.equals(instanceType, "docker")){
+            return new DockerInstanceManager(dockerClient);
+        }else{
+            return new ProcessInstanceManager();
+        }
+    }
+}

+ 0 - 3
schedule-manager/src/main/java/cn/com/yusys/manager/config/ParserConfig.java

@@ -39,9 +39,6 @@ public class ParserConfig {
     // Python实例状态接口路径
     public static final String STATUS_API = "/status";
 
-    //实例镜像名称
-    @Value("${parser.instance.image}")
-    public String IMAGE_NAME;
 
     // GPU负载阈值(百分比)
     @Value("${parser.monitor.gpu-load-threshold}")

+ 1 - 23
schedule-manager/src/main/java/cn/com/yusys/manager/controller/ManagerController.java

@@ -22,19 +22,6 @@ public class ManagerController {
     @Autowired
     private InstanceMonitorService parserService;
 
-    @Autowired
-    private TaskLogService taskLogService;
-
-    /**
-     * 接收任务调用,解析实例并执行解析任务
-     * @param request 任务请求对象
-     * @return 任务执行结果
-     */
-    @PostMapping("/parse")
-    public ExecuteResponse executeParseTask(@RequestBody Task request) {
-        return parserService.processMultimodalTask(request);
-    }
-
     /**
      * 获取实例管理信息
      * @return 实例管理信息
@@ -49,18 +36,9 @@ public class ManagerController {
      * @param request 实例配置请求
      * @return 操作结果
      */
-    @PostMapping("/instances/config")
+    @PostMapping("/config")
     public InstanceManagementResponse updateInstanceConfig( @Valid @RequestBody InstanceConfigRequest request) {
         return parserService.updateInstanceConfig(request);
     }
 
-    /**
-     * 查询任务日志
-     * @param taskId 任务ID
-     * @return 任务日志
-     */
-    @GetMapping("/tasks/{taskId}/logs")
-    public TaskLogResponse getTaskLogs(@PathVariable String taskId) {
-        return TaskLogResponse.success(taskId, taskLogService.getTaskLogs(taskId));
-    }
 }

+ 68 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/controller/TaskController.java

@@ -0,0 +1,68 @@
+package cn.com.yusys.manager.controller;
+
+import cn.com.yusys.manager.model.ExecuteResponse;
+import cn.com.yusys.manager.model.Task;
+import cn.com.yusys.manager.model.TaskRecordRequest;
+import cn.com.yusys.manager.service.InstanceMonitorService;
+import cn.com.yusys.manager.service.TaskRecordService;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.*;
+
+import javax.annotation.Resource;
+
+@Slf4j
+@RestController
+@RequestMapping("/api/task")
+@RequiredArgsConstructor
+public class TaskController {
+
+    @Resource
+    private  TaskRecordService taskRecordService;
+
+    @Resource
+    private InstanceMonitorService parserService;
+
+    /**
+     * 创建任务记录
+     * @param taskRecordRequest 请求体
+     * @return 响应
+     */
+    @PostMapping("/create-record")
+    public ResponseEntity<String> createTaskRecord(@RequestBody TaskRecordRequest taskRecordRequest) {
+        try {
+            taskRecordService.createTaskRecord(taskRecordRequest);
+            return ResponseEntity.ok("Task record created successfully");
+        } catch (Exception e) {
+            log.error("创建任务记录失败", e);
+            return ResponseEntity.status(500).body("Failed to create task record: " + e.getMessage());
+        }
+    }
+
+    /**
+     * 更新任务状态
+     * @param taskRecordRequest 请求体
+     * @return 响应
+     */
+    @PostMapping("/update-status")
+    public ResponseEntity<String> updateTaskStatus(@RequestBody TaskRecordRequest taskRecordRequest) {
+        try {
+            taskRecordService.updateStatus(taskRecordRequest);
+            return ResponseEntity.ok("Task status updated successfully");
+        } catch (Exception e) {
+            log.error("更新任务状态失败", e);
+            return ResponseEntity.status(500).body("Failed to update task status: " + e.getMessage());
+        }
+    }
+
+    /**
+     * 接收任务调用,解析实例并执行解析任务
+     * @param request 任务请求对象
+     * @return 任务执行结果
+     */
+    @PostMapping("/parse")
+    public ExecuteResponse executeParseTask(@RequestBody Task request) {
+        return parserService.processMultimodalTask(request);
+    }
+}

+ 59 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/entity/TaskRecordEntity.java

@@ -0,0 +1,59 @@
+package cn.com.yusys.manager.entity;
+
+import com.baomidou.mybatisplus.annotation.TableName;
+import lombok.Data;
+import java.time.LocalDateTime;
+
+/**
+ * 解析任务记录实体类
+ * 用于记录从消费任务到解析执行过程中的任务状态变化
+ */
+@Data
+@TableName("task_record")
+public class TaskRecordEntity {
+
+    /**
+     * 主键ID
+     */
+    private Integer id;
+
+    /**
+     * 任务ID
+     */
+    private String taskId;
+
+    /**
+     * Kafka主题
+     */
+    private String topic;
+
+    /**
+     * 分区号
+     */
+    private Integer partitionId;
+
+    /**
+     * 偏移量
+     */
+    private Long offset;
+
+    /**
+     * 任务状态
+     * 0: 下发中(消费者初始化)
+     * 1: 等待解析(管理模块修改)
+     * 2: 解析中(解析实例开始修改)
+     * 3: 解析成功(解析实例结束修改)
+     * 4: 解析失败(解析实例异常修改)
+     */
+    private Integer status;
+
+    /**
+     * 任务创建时间
+     */
+    private LocalDateTime createTime;
+
+    /**
+     * 日志路径
+     */
+    private String logPath;
+}

+ 57 - 12
schedule-manager/src/main/java/cn/com/yusys/manager/instanceManager/Impl/DockerInstanceManager.java

@@ -16,7 +16,6 @@ import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
 
-@Component
 public class DockerInstanceManager  implements InstanceManager {
 
     @Value("${docker.host:tcp://127.0.0.1:2375}")
@@ -29,17 +28,55 @@ public class DockerInstanceManager  implements InstanceManager {
 
     private final DockerClient client;
 
+    @Value("${parser.instance.image:parser-service:latest}")
+    private String imageName ;
+
     public DockerInstanceManager(DockerClient dockerClient) {
         this.client = dockerClient;
     }
     /**
      * 拉起解析服务实例
-     * @param imageName Python解析服务的镜像名
      * @param port 宿主机映射端口(避免端口冲突)
      * @return 容器ID(用于后续终结实例)
      */
-    public String startParseInstance(String imageName, int port) {
+    public String startParseInstance(int port) {
         try {
+            String containerName = "parser-service" + port;
+
+            // 检查并清理同名容器
+            try {
+                client.listContainersCmd()
+                    .withShowAll(true)
+                    .exec()
+                    .stream()
+                    .filter(container -> {
+                        String[] names = container.getNames();
+                        if (names != null) {
+                            for (String name : names) {
+                                if (name.startsWith("/") && name.substring(1).equals(containerName)) {
+                                    return true;
+                                }
+                            }
+                        }
+                        return false;
+                    })
+                    .findFirst()
+                    .ifPresent(container -> {
+                        log.warn("发现同名容器 {},正在清理...", containerName);
+                        try {
+                            client.stopContainerCmd(container.getId()).withTimeout(10).exec();
+                            client.removeContainerCmd(container.getId())
+                                .withForce(true)
+                                .withRemoveVolumes(true)
+                                .exec();
+                            log.info("成功清理同名容器 {}", containerName);
+                        } catch (Exception e) {
+                            log.error("清理同名容器 {} 失败", containerName, e);
+                        }
+                    });
+            } catch (Exception e) {
+                log.warn("检查同名容器时发生异常,继续创建容器", e);
+            }
 
             HostConfig hostConfig = HostConfig.newHostConfig()
                     .withPortBindings(new PortBinding(
@@ -54,40 +91,48 @@ public class DockerInstanceManager  implements InstanceManager {
                     .withHostConfig(hostConfig)
                     .withExposedPorts(new ExposedPort(8000))
                     .withCmd("python", "parse_service.py", "--host", "0.0.0.0", "--port", "8000")
-                    .withName("python-parser-" + port) // 容器命名,便于识别
+                    .withName(containerName) // 容器命名,便于识别
                     .exec();
 
             // 启动容器
             client.startContainerCmd(container.getId()).exec();
 
-            String containerId = container.getId();
-            log.info("Python实例启动成功,容器ID:{},映射端口:{}", containerId, port);
-            return containerId;
+            String instanceId = container.getId();
+            log.info("Python实例启动成功,实例ID:{},映射端口:{}", instanceId, port);
+            return instanceId;
         } catch (Exception e) {
             log.error("Python实例启动失败:{}", e.getMessage());
             throw new RuntimeException("拉起Python实例失败:" + e.getMessage());
         }
     }
 
-    public void terminateInstance(String containerId) {
+    @Override
+    public void terminateInstance(String instanceId) {
         try {
             // 1. 尝试优雅停止 (等待 10 秒)
-            StopContainerCmd stopCmd = client.stopContainerCmd(containerId);
+            StopContainerCmd stopCmd = client.stopContainerCmd(instanceId);
             stopCmd.withTimeout(20);
             stopCmd.exec();
 
-            log.info("实例已停止: {}", containerId);
+            log.info("实例已停止: {}", instanceId);
 
             // 2. 移除容器 (释放资源)
-            client.removeContainerCmd(containerId)
+            client.removeContainerCmd(instanceId)
                     .withForce(true) // 如果还在运行则强制移除
                     .withRemoveVolumes(true) // 清理匿名卷
                     .exec();
 
-            log.info("实例容器已移除: {}", containerId);
+            log.info("实例容器已移除: {}", instanceId);
         } catch (Exception e) {
             log.info("终结实例失败: {}", e.getMessage());
         }
     }
 
+    @Override
+    public Long getPid(String instanceId) {
+        // Docker容器模式下不直接管理进程PID
+        log.warn("Docker模式下不支持获取进程PID");
+        return -1L;
+    }
+
 }

+ 320 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/instanceManager/Impl/ProcessInstanceManager.java

@@ -0,0 +1,320 @@
+package cn.com.yusys.manager.instanceManager.Impl;
+
+import cn.com.yusys.manager.common.ParseInstanceStatusRegistry;
+import cn.com.yusys.manager.instanceManager.InstanceManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import javax.annotation.Resource;
+import java.io.*;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class ProcessInstanceManager implements InstanceManager {
+
+    private static final Logger log = LoggerFactory.getLogger(ProcessInstanceManager.class);
+
+    // 用于消费子进程输出流的线程池,避免阻塞主进程
+    private final ExecutorService streamGobblerExecutor = Executors.newCachedThreadPool(r -> {
+        Thread t = new Thread(r, "Process-Stream-Gobbler");
+        t.setDaemon(true);
+        return t;
+    });
+
+
+    // 存储实例ID与进程信息的映射
+    // 建议使用自定义对象包裹 Process,以便管理更多状态
+    private final Map<String, ProcessInfo> processMap = new ConcurrentHashMap<>();
+
+    @Value("${parser.instance.script-path:parse_service.py}")
+    private String scriptPath;
+
+    @Value("${parser.instance.python-path:python3}")
+    private String pythonPath;
+
+    @Value("${parser.instance.work-dir:.}")
+    private String workDir;
+
+    @PostConstruct
+    public void init() {
+        // 获取项目根目录
+        File projectRoot = getProjectRoot();
+        
+        // 解析工作目录
+        File dir;
+        if (workDir.startsWith(".")) {
+            // 相对路径,基于项目根目录解析
+            dir = new File(projectRoot, workDir.substring(1));
+        } else if (workDir.startsWith("..")) {
+            // 相对路径,基于项目根目录解析
+            dir = new File(projectRoot, workDir);
+        } else {
+            // 绝对路径或相对路径,直接使用
+            dir = new File(workDir);
+            if (!dir.isAbsolute()) {
+                // 如果不是绝对路径,基于项目根目录解析
+                dir = new File(projectRoot, workDir);
+            }
+        }
+        
+        if (!dir.exists() || !dir.isDirectory()) {
+            log.error("工作目录不存在或不是目录:{}", dir.getAbsolutePath());
+            // 根据策略决定是否抛出异常阻止启动
+            throw new IllegalStateException("Invalid work directory: " + dir.getAbsolutePath());
+        }
+        
+        // 更新 workDir 为绝对路径
+        this.workDir = dir.getAbsolutePath();
+        log.info("ProcessInstanceManager 初始化完成,工作目录:{}", workDir);
+    }
+    
+    /**
+     * 获取项目根目录(包含 parser 目录的目录)
+     * @return 项目根目录
+     */
+    private File getProjectRoot() {
+        try {
+            // 获取当前类的路径
+            String path = getClass().getProtectionDomain().getCodeSource().getLocation().toURI().getPath();
+            File file = new File(path);
+            
+            // 如果是文件(如 JAR 包),获取其父目录
+            if (file.isFile()) {
+                file = file.getParentFile();
+            }
+            
+            // 向上查找项目根目录(同时包含 pom.xml 和 parser 目录)
+            while (file != null) {
+                File pomFile = new File(file, "pom.xml");
+                File parserDir = new File(file, "parser");
+                if (pomFile.exists() && parserDir.exists() && parserDir.isDirectory()) {
+                    return file;
+                }
+                file = file.getParentFile();
+            }
+            
+            // 如果找不到,尝试使用当前工作目录
+            File currentDir = new File(System.getProperty("user.dir"));
+            File parserDir = new File(currentDir, "parser");
+            if (parserDir.exists() && parserDir.isDirectory()) {
+                return currentDir;
+            }
+            
+            // 如果当前工作目录也没有 parser 目录,尝试向上查找
+            File parentDir = currentDir.getParentFile();
+            if (parentDir != null) {
+                parserDir = new File(parentDir, "parser");
+                if (parserDir.exists() && parserDir.isDirectory()) {
+                    return parentDir;
+                }
+            }
+            
+            // 如果都找不到,返回当前工作目录
+            log.warn("无法找到包含 parser 目录的项目根目录,使用当前工作目录");
+            return currentDir;
+        } catch (Exception e) {
+            log.warn("无法获取项目根目录,使用当前工作目录", e);
+            return new File(System.getProperty("user.dir"));
+        }
+    }
+
+    @Override
+    public String startParseInstance(int port) {
+        String instanceId = "python-parser-" + port;
+
+        // 双重检查锁或直接覆盖,这里选择先清理
+        if (processMap.containsKey(instanceId)) {
+            log.warn("实例 {} 已存在,正在强制清理...", instanceId);
+            terminateInstance(instanceId);
+        }
+
+        // 解析绝对路径
+        Path scriptFullPath = Paths.get(workDir, scriptPath).toAbsolutePath();
+        if (!Files.exists(scriptFullPath)) {
+            throw new RuntimeException("解析脚本不存在:" + scriptFullPath);
+        }
+
+        ProcessBuilder processBuilder = new ProcessBuilder(
+                pythonPath,
+                scriptFullPath.toString(),
+                "--host", "0.0.0.0",
+                "--port", String.valueOf(port)
+        );
+
+        processBuilder.directory(new File(workDir));
+        // 不自动合并流,我们需要分别处理或统一由 Gobbler 处理,这里保持合并方便处理
+        processBuilder.redirectErrorStream(true);
+
+        try {
+            Process process = processBuilder.start();
+
+            // 【关键修复】启动后台线程消费输出流,防止阻塞
+            streamGobblerExecutor.submit(() -> gobbleStream(process, instanceId));
+
+            // 简单延迟检查,确认进程没有立即退出 (可选,更严谨的做法是异步监听 exitCode)
+            // 这里暂不 block 等待,假设启动即成功,依靠后续健康检查或日志发现崩溃
+
+            long pid = getPid(process);
+            ProcessInfo info = new ProcessInfo(process, instanceId, System.currentTimeMillis(), pid);
+            processMap.put(instanceId, info);
+
+            log.info("Python 实例启动成功,ID: {}, PID: {}, 端口:{}", instanceId, getPid(process), port);
+            return instanceId;
+
+        } catch (IOException e) {
+            log.error("启动 Python 实例失败:{}", e.getMessage(), e);
+            throw new RuntimeException("拉起 Python 实例失败:" + e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void terminateInstance(String instanceId) {
+        ProcessInfo info = processMap.remove(instanceId);
+        if (info == null) {
+            log.warn("未找到实例:{}", instanceId);
+            return;
+        }
+
+        Process process = info.process;
+        if (!process.isAlive()) {
+            log.info("实例 {} 已经退出,无需终止", instanceId);
+            return;
+        }
+
+        log.info("正在终止实例:{} (PID: {})", instanceId, getPid(process));
+
+        // 1. 尝试优雅终止
+        process.destroy();
+        try {
+            if (!process.waitFor(10, TimeUnit.SECONDS)) {
+                // 2. 强制终止
+                log.warn("实例 {} 未在 10s 内退出,执行 kill -9", instanceId);
+                process.destroyForcibly();
+                process.waitFor(5, TimeUnit.SECONDS);
+            }
+            log.info("实例 {} 已正常终止", instanceId);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            log.error("等待实例 {} 终止时被中断", instanceId, e);
+            process.destroyForcibly();
+        }
+    }
+
+    /**
+     * 后台消费子进程的输出流,防止缓冲区满导致子进程阻塞
+     */
+    private void gobbleStream(Process process, String instanceId) {
+        try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
+            String line;
+            while ((line = reader.readLine()) != null) {
+                // 将 Python 的日志转发到 Java 的日志系统,带上实例ID便于排查
+                log.info("[Instance-{}] {}", instanceId, line);
+            }
+        } catch (IOException e) {
+            // 进程结束时流会关闭,这是正常的
+            if (process.isAlive()) {
+                log.error("读取实例 {} 输出流时发生异常", instanceId, e);
+            }
+        } finally {
+            // 进程结束后,可以从 map 中清理(如果需要自动清理已退出的进程)
+            // 这里可以选择移除,或者保留状态供查询
+            log.debug("实例 {} 的输出流读取线程结束", instanceId);
+            // 可选:如果是意外退出,可以在这里回调通知注册中心更新状态
+            checkAndRemoveIfExited(instanceId, process);
+        }
+    }
+
+    private void checkAndRemoveIfExited(String instanceId, Process process) {
+        // 简单的延迟检查,实际生产中可能需要更复杂的回调机制
+        try {
+            Thread.sleep(100);
+            if (!process.isAlive()) {
+                int exitCode = process.exitValue();
+                log.error("实例 {} 意外退出,退出码:{}", instanceId, exitCode);
+                processMap.remove(instanceId); // 移除死进程
+                // TODO: 通知 instanceStateRegistry 更新状态为 FAILED
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    public Map<String, Process> getAllProcesses() {
+        // 返回只读视图或转换后的 Map,暴露内部 Process 对象需谨慎
+        Map<String, Process> result = new HashMap<>();
+        processMap.forEach((k, v) -> {
+            if (v.process.isAlive()) {
+                result.put(k, v.process);
+            }
+        });
+        return result;
+    }
+
+    /**
+     * 应用关闭时,清理所有子进程
+     */
+    @PreDestroy
+    public void destroyAll() {
+        log.info("正在关闭 ProcessInstanceManager,终止所有活跃实例...");
+        for (String id : new HashMap<>(processMap).keySet()) {
+            terminateInstance(id);
+        }
+        streamGobblerExecutor.shutdownNow();
+        log.info("所有解析实例已清理完毕");
+    }
+
+    // 内部类,用于封装进程信息
+    private static class ProcessInfo {
+        final Process process;
+        final String instanceId;
+        final long startTime;
+        final Long pid; // 存储进程PID
+
+        public ProcessInfo(Process process, String instanceId, long startTime, Long pid) {
+            this.process = process;
+            this.instanceId = instanceId;
+            this.startTime = startTime;
+            this.pid = pid;
+        }
+    }
+
+    @Override
+    public Long getPid(String instanceId) {
+        ProcessInfo info = processMap.get(instanceId);
+        if (info == null) {
+            log.warn("未找到实例:{}", instanceId);
+            return -1L;
+        }
+        return info.pid;
+    }
+
+    private long getPid(Process process) {
+        if (process == null) return -1;
+
+        // 尝试通过反射获取 (主要针对 Linux/Mac 的 UNIXProcess)
+        if (process.getClass().getName().equals("java.lang.UNIXProcess")) {
+            try {
+                java.lang.reflect.Field field = process.getClass().getDeclaredField("pid");
+                field.setAccessible(true);
+                return field.getLong(process);
+            } catch (Exception e) {
+                log.warn("反射获取 PID 失败", e);
+            }
+        }
+
+        // 如果是 Windows 或其他情况,Java 8 很难直接获取 PID
+        // 如果项目确定只跑在 Linux/Mac 上,上面的代码就够了。
+        // 如果需要支持 Windows Java 8,建议忽略 PID 日志或升级 JDK。
+
+        return -1;
+    }
+}

+ 3 - 2
schedule-manager/src/main/java/cn/com/yusys/manager/instanceManager/InstanceManager.java

@@ -2,6 +2,7 @@ package cn.com.yusys.manager.instanceManager;
 
 // 实例管理器抽象接口
 public interface InstanceManager {
-    String startParseInstance(String imageName, int port) ;
-    void terminateInstance(String containerId) ;
+    String startParseInstance(int port) ;
+    void terminateInstance(String instanceId) ;
+    Long getPid(String instanceId);
 }

+ 50 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/model/TaskRecordRequest.java

@@ -0,0 +1,50 @@
+package cn.com.yusys.manager.model;
+
+import lombok.Data;
+
+import java.time.LocalDateTime;
+
+/**
+ * 任务记录请求类,用于封装任务记录的相关信息
+ */
+@Data
+public class TaskRecordRequest {
+
+    /**
+     * 任务ID,用于唯一标识一个任务
+     */
+    private String taskId;
+
+    /**
+     * 消息主题,通常用于消息队列中的主题分类
+     */
+    private String topic;
+
+    /**
+     * 分区号,通常用于消息队列中的分区机制
+     */
+    private Integer partition;
+
+    /**
+     * 偏移量,用于标识消息在分区中的位置
+     */
+    private Long offset;
+
+    /**
+     * 任务状态,用于表示任务的当前执行状态
+     */
+    private Integer status;
+
+    /**
+     * 创建时间,记录任务记录的创建时间
+     * 使用Java 8的LocalDateTime类来精确到纳秒的时间表示
+     */
+    private LocalDateTime createTime;
+
+    /**
+     * 日志路径,用于存储任务执行日志的文件路径
+     */
+    private String logPath;
+
+
+}

+ 12 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/repository/TaskRecordMapper.java

@@ -0,0 +1,12 @@
+package cn.com.yusys.manager.repository;
+
+import cn.com.yusys.manager.entity.TaskRecordEntity;
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import org.apache.ibatis.annotations.Mapper;
+
+/**
+ * 任务记录Mapper接口
+ */
+@Mapper
+public interface TaskRecordMapper extends BaseMapper<TaskRecordEntity> {
+}

+ 70 - 41
schedule-manager/src/main/java/cn/com/yusys/manager/service/InstanceMonitorService.java

@@ -2,13 +2,8 @@ package cn.com.yusys.manager.service;
 
 import cn.com.yusys.manager.common.ParseInstanceStatusRegistry;
 import cn.com.yusys.manager.config.ParserConfig;
-import cn.com.yusys.manager.model.ExecuteResponse;
-import cn.com.yusys.manager.model.InstanceStatus;
-import cn.com.yusys.manager.model.InstanceStatusResponse;
-import cn.com.yusys.manager.instanceManager.Impl.DockerInstanceManager;
-import cn.com.yusys.manager.model.Task;
-import cn.com.yusys.manager.model.InstanceManagementResponse;
-import cn.com.yusys.manager.model.InstanceConfigRequest;
+import cn.com.yusys.manager.instanceManager.InstanceManager;
+import cn.com.yusys.manager.model.*;
 import cn.com.yusys.manager.util.ParseInstanceClient;
 import cn.com.yusys.manager.common.PortPool;
 import lombok.RequiredArgsConstructor;
@@ -45,9 +40,9 @@ public class InstanceMonitorService {
     @Resource
     private final ParserConfig parserConfig;
 
-    //docker实例管理器
+    //实例管理器
     @Resource
-    private DockerInstanceManager dockerInstanceManager;
+    private InstanceManager instanceManager;
 
     // 注入独立的端口池管理工具类
     @Resource
@@ -57,6 +52,10 @@ public class InstanceMonitorService {
     @Resource
     private TaskLogService taskLogService;
 
+    // 注入任务记录服务
+    @Resource
+    private TaskRecordService taskRecordService;
+
     // 最大重试次数
     @org.springframework.beans.factory.annotation.Value("${parser.task.max-retry:3}")
     private int maxRetry;
@@ -69,14 +68,14 @@ public class InstanceMonitorService {
             // 4. 使用PortPool分配端口
             Integer port = portPool.allocatePort();
             if(port != null){
-                String containerId = dockerInstanceManager.startParseInstance(parserConfig.IMAGE_NAME, port);
+                String instanceId = instanceManager.startParseInstance(port);
                 // 增加容器ID空值校验
-                if (containerId == null || containerId.isEmpty()) {
-                    log.error("初始化实例失败:Docker容器创建失败,端口:{}", port);
+                if (instanceId == null || instanceId.isEmpty()) {
+                    log.error("初始化实例失败:进程创建失败,端口:{}", port);
                     portPool.releasePort(port); // 归还端口
                     continue;
                 }
-                InstanceStatus instanceStatus = saveInstanceStatus(containerId, port);
+                InstanceStatus instanceStatus = saveInstanceStatus(instanceId, port);
             } else {
                 log.error("初始化实例失败:无可用端口");
                 break;
@@ -130,11 +129,11 @@ public class InstanceMonitorService {
                 activeInstancePool.remove(instanceId);
                 // 使用PortPool释放端口
                 portPool.releasePort(state.getPort());
-                // 捕获Docker操作异常
+                // 捕获进程操作异常
                 try {
-                    dockerInstanceManager.terminateInstance(state.getContainerId());
+                    instanceManager.terminateInstance(instanceId);
                 } catch (Exception e) {
-                    log.error("终止实例{}的Docker容器失败", instanceId, e);
+                    log.error("终止实例{}的进程失败", instanceId, e);
                 }
             }
         });
@@ -222,14 +221,14 @@ public class InstanceMonitorService {
                 // 使用PortPool分配端口
                 Integer port = portPool.allocatePort();
                 if(port != null){
-                    String containerId = dockerInstanceManager.startParseInstance(parserConfig.IMAGE_NAME, port);
-                    // 增加容器ID空值校验
-                    if (containerId == null || containerId.isEmpty()) {
-                        log.error("创建实例失败:Docker容器创建失败,端口:{}", port);
+                    String instanceId = instanceManager.startParseInstance(port);
+                    // 增加实例ID空值校验
+                    if (instanceId == null || instanceId.isEmpty()) {
+                        log.error("创建实例失败:进程创建失败,端口:{}", port);
                         portPool.releasePort(port); // 归还端口
                         continue;
                     }
-                    InstanceStatus instanceStatus = saveInstanceStatus(containerId, port);
+                    InstanceStatus instanceStatus = saveInstanceStatus(instanceId, port);
                 } else {
                     log.error("创建实例失败:无可用端口");
                     break;
@@ -317,17 +316,19 @@ public class InstanceMonitorService {
     }
 
     //保存实例状态
-    private InstanceStatus saveInstanceStatus(String containerId,Integer port) {
+    private InstanceStatus saveInstanceStatus(String instanceId, Integer port) {
         InstanceStatus instanceStatus = new InstanceStatus();
         instanceStatus.setIp("127.0.0.1");
         instanceStatus.setPort(port);
         instanceStatus.setLastHeartbeatTime(System.currentTimeMillis());
         instanceStatus.setStatus(0);
+        instanceStatus.setInstanceId(instanceId);
+        // 获取并设置进程PID
+        Long pid = instanceManager.getPid(instanceId);
+        instanceStatus.setPid(pid);
 
         Map<String, InstanceStatus> activeInstancePool = instancestatusRegistry.getActiveInstancePool();
-
-        instanceStatus.setContainerId(containerId);
-        activeInstancePool.put(containerId, instanceStatus);
+        activeInstancePool.put(instanceId, instanceStatus);
         return instanceStatus;
     }
 
@@ -343,15 +344,15 @@ public class InstanceMonitorService {
             // 使用PortPool分配端口
             Integer port = portPool.allocatePort();
             if(port != null){
-                String containerId = dockerInstanceManager.startParseInstance(parserConfig.IMAGE_NAME, port);
-                // 增加容器ID空值校验
-                if (containerId == null || containerId.isEmpty()) {
-                    log.error("GPU扩容实例失败:Docker容器创建失败,端口:{}", port);
+                String instanceId = instanceManager.startParseInstance(port);
+                // 增加实例ID空值校验
+                if (instanceId == null || instanceId.isEmpty()) {
+                    log.error("GPU扩容实例失败:进程创建失败,端口:{}", port);
                     portPool.releasePort(port); // 归还端口
                     continue;
                 }
-                InstanceStatus instanceStatus = saveInstanceStatus(containerId, port);
-                log.info("基于GPU负载扩容,已创建实例,容器ID:{},端口:{}", containerId, port);
+                InstanceStatus instanceStatus = saveInstanceStatus(instanceId, port);
+                log.info("基于GPU负载扩容,已创建实例,实例ID:{},端口:{}", instanceId, port);
             } else {
                 log.warn("端口池已满,无法继续GPU扩容");
                 break;
@@ -387,10 +388,10 @@ public class InstanceMonitorService {
                 activeInstancePool.remove(instanceId);
                 // 释放端口
                 portPool.releasePort(instanceStatus.getPort());
-                // 关闭Docker容器
-                dockerInstanceManager.terminateInstance(instanceStatus.getContainerId());
+                // 关闭进程
+                instanceManager.terminateInstance(instanceId);
 
-                log.info("缩容实例{}成功,已关闭容器并释放端口{}", instanceId, instanceStatus.getPort());
+                log.info("缩容实例{}成功,已关闭进程并释放端口{}", instanceId, instanceStatus.getPort());
             } catch (Exception e) {
                 log.error("缩容实例{}失败", instanceId, e);
                 // 缩容失败时,将实例重新加入活跃池(避免端口丢失)
@@ -408,6 +409,12 @@ public class InstanceMonitorService {
         String taskId = task.getTaskId();
         
         try {
+            TaskRecordRequest taskRecordRequest=new TaskRecordRequest();
+            taskRecordRequest.setTaskId(taskId);
+            taskRecordRequest.setStatus(1);
+            // 更新任务状态为等待解析
+            taskRecordService.updateStatus(taskRecordRequest);
+            
             // 记录任务开始日志
             taskLogService.logTaskStart(taskId, task.getFilePath());
             
@@ -416,17 +423,33 @@ public class InstanceMonitorService {
             if (idleInstance == null) {
                 log.debug("当前无空闲解析实例");
                 taskLogService.logTaskFailure(taskId, "当前无空闲解析实例");
+                // 更新任务状态为解析失败
+                taskRecordRequest.setStatus(4);
+                taskRecordService.updateStatus(taskRecordRequest);
                 return ExecuteResponse.fail(300,"当前无空闲解析实例");
             }
             
+            // 更新任务状态为解析中
+            taskRecordRequest.setStatus(2);
+            taskRecordService.updateStatus(taskRecordRequest);
+            
             // 记录实例分配日志
-            taskLogService.logInstanceAllocation(taskId, idleInstance.getContainerId());
+            taskLogService.logInstanceAllocation(taskId, idleInstance.getInstanceId());
 
             //  执行任务解析
             ExecuteResponse response = executeTaskWithRetry(idleInstance, task);
             
+            // 根据执行结果更新任务状态
+            if ( response.getCode() == 200) {
+                taskRecordRequest.setStatus(3);
+                taskRecordService.updateStatus(taskRecordRequest); // 解析成功
+            } else {
+                taskRecordRequest.setStatus(4);
+                taskRecordService.updateStatus(taskRecordRequest); // 解析失败
+            }
+            
             // 记录任务完成日志
-            if (response != null && response.getCode() == 200) {
+            if (response.getCode() == 200) {
                 taskLogService.logTaskComplete(taskId, response.getMessage());
             } else {
                 taskLogService.logTaskFailure(taskId, response != null ? response.getMessage() : "未知错误");
@@ -437,6 +460,11 @@ public class InstanceMonitorService {
         } catch (Exception e) {
             log.error("多模态任务解析定时任务执行失败", e);
             taskLogService.logTaskFailure(taskId, e.getMessage());
+            // 更新任务状态为解析失败
+            TaskRecordRequest taskRecordRequest=new TaskRecordRequest();
+            taskRecordRequest.setTaskId(taskId);
+            taskRecordRequest.setStatus(4);
+            taskRecordService.updateStatus(taskRecordRequest);
             return ExecuteResponse.fail("多模态任务解析定时任务执行失败");
         }
     }
@@ -456,7 +484,7 @@ public class InstanceMonitorService {
      * 执行任务并处理重试逻辑
      */
     private ExecuteResponse executeTaskWithRetry(InstanceStatus instance, Task task) {
-        String instanceId = instance.getContainerId();
+        String instanceId = instance.getInstanceId();
         int retryCount = 0;
         // 标记实例为运行中
         instance.setStatus(1);
@@ -502,11 +530,11 @@ public class InstanceMonitorService {
                     task);
 
             if (response == null || response.getCode() != 200) {
-                log.warn("调用解析器返回失败,实例:{},响应:{}", instance.getContainerId(), response);
+                log.warn("调用解析器返回失败,实例:{},响应:{}", instance.getInstanceId(), response);
             }
             return response;
         } catch (Exception e) {
-            log.error("调用解析器失败,实例:{}", instance.getContainerId(), e);
+            log.error("调用解析器失败,实例:{}", instance.getInstanceId(), e);
             throw e;
         }
     }
@@ -533,7 +561,7 @@ public class InstanceMonitorService {
             // 构建实例详情列表
             List<InstanceManagementResponse.InstanceDetail> instanceDetails = activeInstancePool.values().stream()
                     .map(status -> InstanceManagementResponse.InstanceDetail.builder()
-                            .containerId(status.getContainerId())
+                            .instanceId(status.getInstanceId())
                             .ip(status.getIp())
                             .port(status.getPort())
                             .status(status.getStatus())
@@ -591,4 +619,5 @@ public class InstanceMonitorService {
         }
     }
 
-}
+}
+

+ 12 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/service/TaskRecordService.java

@@ -0,0 +1,12 @@
+package cn.com.yusys.manager.service;
+
+import cn.com.yusys.manager.entity.TaskRecordEntity;
+import cn.com.yusys.manager.model.TaskRecordRequest;
+import com.baomidou.mybatisplus.extension.service.IService;
+
+public interface TaskRecordService extends IService<TaskRecordEntity> {
+
+    TaskRecordEntity createTaskRecord(TaskRecordRequest taskRecordRequest);
+
+    void updateStatus(TaskRecordRequest taskRecordRequest);
+}

+ 43 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/service/impl/TaskRecordServiceImpl.java

@@ -0,0 +1,43 @@
+package cn.com.yusys.manager.service.impl;
+
+import cn.com.yusys.manager.entity.TaskRecordEntity;
+import cn.com.yusys.manager.model.TaskRecordRequest;
+import cn.com.yusys.manager.repository.TaskRecordMapper;
+import cn.com.yusys.manager.service.TaskRecordService;
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import org.springframework.stereotype.Service;
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+
+import java.time.LocalDateTime;
+
+@Service
+public class TaskRecordServiceImpl extends ServiceImpl<TaskRecordMapper, TaskRecordEntity> implements TaskRecordService {
+
+    @Override
+    public TaskRecordEntity createTaskRecord(TaskRecordRequest taskRecordRequest) {
+        TaskRecordEntity record = new TaskRecordEntity();
+        record.setTaskId(taskRecordRequest.getTaskId());
+        record.setTopic(taskRecordRequest.getTopic());
+        record.setPartitionId(taskRecordRequest.getPartition());
+        record.setOffset(taskRecordRequest.getOffset());
+        record.setStatus(0);
+        record.setCreateTime(LocalDateTime.now());
+        record.setLogPath(taskRecordRequest.getLogPath());
+        save(record);
+        return record;
+    }
+
+    @Override
+    public void updateStatus( TaskRecordRequest taskRecordRequest) {
+        QueryWrapper<TaskRecordEntity> wrapper = new QueryWrapper<>();
+        wrapper.eq("task_id", taskRecordRequest.getTaskId());
+        TaskRecordEntity record = getOne(wrapper);
+        
+        if (record != null) {
+            record.setStatus(taskRecordRequest.getStatus());
+            updateById(record);
+        } else {
+            throw new RuntimeException("Task not found: " + taskRecordRequest.getTaskId());
+        }
+    }
+}

+ 25 - 16
schedule-manager/src/main/resources/application.yml

@@ -3,22 +3,23 @@ server:
 
 spring:
   application:
-    name: yusp-kafka-manager
-  kafka:
-    bootstrap-servers: 10.192.72.13:9092
-    consumer:
-      group-id: yusp-topic-group
-      enable-auto-commit: false
-      auto-offset-reset: earliest
-      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
-      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+    name: schedule-manager
+  
+  # 数据源配置
+  datasource:
+    url: jdbc:mysql://10.192.72.13:7289/multimodel?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai
+    driver-class-name: com.mysql.cj.jdbc.Driver
+    username: root
+    password: 123456
+  
+  # MyBatis Plus配置
+  mybatis-plus:
+    mapper-locations: classpath*:/mapper/**/*.xml
+    type-aliases-package: cn.com.yusys.manager.entity
+    configuration:
+      map-underscore-to-camel-case: true
+      log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
 
-# 可选:在配置中定义 Topic 列表,方便管理
-kafka:
-  topics:
-    listen: test-topic
-    task: task-topic
-    failed: task-failed
 
 docker:
   host: tcp://127.0.0.1:2375 # Docker Daemon地址
@@ -58,7 +59,15 @@ parser:
     # Java 管理端的访问地址(用于 Python 实例注册回调)
     url: http://localhost:8080
 
-  # 实例镜像配置
+  # 实例配置
   instance:
     # 解析服务实例的 Docker 镜像名称
     image: parse-service:latest
+    # Python解析服务脚本路径
+    script-path: parser/parse_service.py
+    # Python解释器路径
+    python-path: parser/venv/bin/python
+    # 工作目录
+    work-dir: .
+    #实例启动方式
+    type: process

+ 1 - 1
schedule-manager/src/test/java/cn/com/yusys/manager/instanceManager/DockerInstanceManagerTest.java

@@ -20,7 +20,7 @@ public class DockerInstanceManagerTest {
             int port = 8081; // 宿主机映射端口
 
             // 启动Python容器
-            String containerId = dockerInstanceManager.startParseInstance(imageName, port);
+            String containerId = dockerInstanceManager.startParseInstance(port);
             System.out.println("启动的容器ID:" + containerId);
 
             // 模拟业务执行后,停止并删除容器

+ 44 - 0
schedule-manager/src/test/java/cn/com/yusys/manager/instanceManager/ProcessInstanceManagerTest.java

@@ -0,0 +1,44 @@
+package cn.com.yusys.manager.instanceManager;
+
+
+import cn.com.yusys.manager.instanceManager.Impl.ProcessInstanceManager;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+
+import javax.annotation.Resource;
+
+@SpringBootTest
+public class ProcessInstanceManagerTest {
+
+    @Resource
+    ProcessInstanceManager processInstanceManager;
+
+
+    @Test
+    public void testStartPythonInstance() {
+        try {
+            // 注意:替换为你实际构建的Python镜像名(如python-parser:final)
+            String imageName = "parse-service";
+            int port = 1086; // 宿主机映射端口
+
+            // 启动Python容器
+            String containerId = processInstanceManager.startParseInstance(port);
+            System.out.println("启动的容器ID:" + containerId);
+
+            // 模拟业务执行后,停止并删除容器
+            // dockerInstanceManager.terminateInstance(containerId);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Test
+    public void testStopInstance() {
+        try {
+            String containerId = "1ab258aaafb5f6202b05b8ffb20d5a376b0fefc9d5acae9da885339009f76fb4";
+            processInstanceManager.terminateInstance(containerId);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+}

+ 1 - 1
schedule-producer/src/main/resources/application.yml

@@ -12,7 +12,7 @@ spring:
       acks: all
       retries: 3
     topics:
-      schedule-topic:
+      task_post_loan_medium:
         partitions: 6