Parcourir la source

增加日志和状态查询接口

zsh il y a 3 semaines
Parent
commit
dd9f17c205

+ 1 - 1
.gitignore

@@ -6,7 +6,7 @@ target/
 !**/src/test/**/target/
 .kotlin
 **.log
-
+logs/
 ### IntelliJ IDEA ###
 .idea/modules.xml
 .idea/jarRepositories.xml

+ 15 - 3
README.md

@@ -125,15 +125,27 @@ java -jar schedule-consumer/target/schedule-consumer.jar
 # 启动Manager
 java -jar schedule-manager/target/schedule-manager.jar
 ```
+解析服务镜像打包:
+```bash
+docker build -t parse-service:latest  .
+```
 
-## 使用指南
+## 测试指南
 
 ### 发送任务
 
 通过Producer模块提供的RESTful API发送任务:
 
-```bash
-curl -X POST http://localhost:8081/api/task   -H "Content-Type: application/json"   -d '{"taskData": "your task data"}'
+```
+POST http://localhost:8081/api/send
+
+测试数据
+{
+    "topic":"schedule-topic",
+    "key":"1",
+    "message":"./examples/test1.pdf"
+}
+
 ```
 
 ### 监控任务

+ 10 - 3
parser/parse_service.py

@@ -3,7 +3,7 @@ import time
 import threading
 import argparse
 from fastapi import FastAPI, BackgroundTasks
-from pydantic import BaseModel
+from pydantic import BaseModel, Field
 from typing import Dict, Optional, List
 import os
 from core.router import ParserFactory
@@ -25,10 +25,17 @@ task_lock = threading.Lock()
 
 
 # ------------------------------
+# 请求模型定义
+# ------------------------------
+class ExecuteRequest(BaseModel):
+    file_path: str=Field(alias="filePath")
+    task_id:str=Field(alias="taskId")
+
+# ------------------------------
 # 接口定义
 # ------------------------------
 @app.post("/execute", summary="接收解析任务并执行")
-async def execute_task(file_path: str):
+async def execute_task(request: ExecuteRequest):
     """
     接收Java端下发的解析任务,后台异步执行
     """
@@ -36,7 +43,7 @@ async def execute_task(file_path: str):
     factory = ParserFactory()
     
     # 解析文件
-    result = await factory.parse(file_path)
+    result = await factory.parse(request.file_path)
     
     
     print(result.content)

+ 3 - 0
schedule-consumer/src/main/java/cn/com/yusys/consumer/listener/MessageListener.java

@@ -12,6 +12,8 @@ import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.kafka.support.Acknowledgment;
 import org.springframework.stereotype.Component;
 
+import java.util.UUID;
+
 @Component
 public class MessageListener {
     private static final Logger log = LoggerFactory.getLogger(MessageListener.class);
@@ -65,6 +67,7 @@ public class MessageListener {
             // 调用ParseServiceClient执行任务
             log.info("开始调用execute接口执行任务,消息内容:{}", message);
             Task task=new Task();
+            task.setTaskId(UUID.randomUUID().toString());
             task.setFilePath(message);
             ExecuteResponse response = parseServiceClient.executeTask(task);
             log.info("任务执行完成,响应:{}", response);

+ 5 - 0
schedule-consumer/src/main/java/cn/com/yusys/consumer/model/Task.java

@@ -12,4 +12,9 @@ public class Task {
      * 文件路径,用于存储任务关联的文件路径信息
      */
     private String filePath;
+
+    /**
+     * 任务ID,用于存储任务ID
+     */
+    private String taskId;
 }

+ 5 - 2
schedule-consumer/src/main/java/cn/com/yusys/consumer/util/ParseServiceClient.java

@@ -2,7 +2,6 @@ package cn.com.yusys.consumer.util;
 
 import cn.com.yusys.consumer.model.Task;
 import cn.com.yusys.consumer.util.response.ExecuteResponse;
-import cn.com.yusys.consumer.util.response.InstanceStatusResponse;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.MediaType;
@@ -10,6 +9,7 @@ import org.springframework.stereotype.Component;
 import org.springframework.web.reactive.function.client.WebClient;
 import org.springframework.web.reactive.function.client.WebClientResponseException;
 import reactor.core.publisher.Mono;
+import org.springframework.beans.factory.annotation.Value;
 
 /**
  * 解析服务实例实例接口调用工具
@@ -21,6 +21,9 @@ public class ParseServiceClient {
 
     private final WebClient webClient;
 
+    @Value("${url.execute}:\"http://127.0.0.1:8083//api/manager/parse\"")
+    private String executeUrl ;
+
     // 通过构造函数注入WebClient.Builder,利用Spring自动配置
     public ParseServiceClient(WebClient.Builder webClientBuilder) {
         this.webClient = webClientBuilder
@@ -29,7 +32,7 @@ public class ParseServiceClient {
     }
 
     /**
-     * 调用/execute接口执行任务
+     * 调用接口执行任务
      * @param taskData 任务数据
      * @return 执行结果响应
      */

+ 4 - 1
schedule-consumer/src/main/resources/application.yml

@@ -16,4 +16,7 @@ spring:
 # 可选:在配置中定义 Topic 列表,方便管理
 kafka:
   topics:
-    listen: schedule-topic
+    listen: schedule-topic
+
+url:
+  execute: "http://127.0.0.1:8083//api/manager/parse"

+ 5 - 0
schedule-manager/pom.xml

@@ -55,6 +55,11 @@
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-webflux</artifactId>
         </dependency>
+        <!-- Spring Boot 校验启动器(自动包含 jakarta.validation 核心包) -->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-validation</artifactId>
+        </dependency>
 
         <!-- SpringBoot测试核心依赖 -->
         <dependency>

+ 39 - 4
schedule-manager/src/main/java/cn/com/yusys/manager/controller/ManagerController.java

@@ -3,11 +3,14 @@ package cn.com.yusys.manager.controller;
 import cn.com.yusys.manager.model.ExecuteResponse;
 import cn.com.yusys.manager.model.Task;
 import cn.com.yusys.manager.service.InstanceMonitorService;
+import cn.com.yusys.manager.model.InstanceManagementResponse;
+import cn.com.yusys.manager.model.InstanceConfigRequest;
+import cn.com.yusys.manager.model.TaskLogResponse;
+import cn.com.yusys.manager.service.TaskLogService;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.web.bind.annotation.PostMapping;
-import org.springframework.web.bind.annotation.RequestBody;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.bind.annotation.*;
+
+import javax.validation.Valid;
 
 /**
  * 调度管理控制器
@@ -19,6 +22,9 @@ public class ManagerController {
     @Autowired
     private InstanceMonitorService parserService;
 
+    @Autowired
+    private TaskLogService taskLogService;
+
     /**
      * 接收任务调用,解析实例并执行解析任务
      * @param request 任务请求对象
@@ -28,4 +34,33 @@ public class ManagerController {
     public ExecuteResponse executeParseTask(@RequestBody Task request) {
         return parserService.processMultimodalTask(request);
     }
+
+    /**
+     * 获取实例管理信息
+     * @return 实例管理信息
+     */
+    @GetMapping("/instances")
+    public InstanceManagementResponse getInstanceManagementInfo() {
+        return parserService.getInstanceManagementInfo();
+    }
+
+    /**
+     * 更新实例配置
+     * @param request 实例配置请求
+     * @return 操作结果
+     */
+    @PostMapping("/instances/config")
+    public InstanceManagementResponse updateInstanceConfig( @Valid @RequestBody InstanceConfigRequest request) {
+        return parserService.updateInstanceConfig(request);
+    }
+
+    /**
+     * 查询任务日志
+     * @param taskId 任务ID
+     * @return 任务日志
+     */
+    @GetMapping("/tasks/{taskId}/logs")
+    public TaskLogResponse getTaskLogs(@PathVariable String taskId) {
+        return TaskLogResponse.success(taskId, taskLogService.getTaskLogs(taskId));
+    }
 }

+ 32 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/model/InstanceConfigRequest.java

@@ -0,0 +1,32 @@
+
+package cn.com.yusys.manager.model;
+
+import javax.validation.constraints.Max;
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+import lombok.Data;
+
+
+
+/**
+ * 实例配置请求实体
+ */
+@Data
+public class InstanceConfigRequest {
+
+    /**
+     * 最小活跃实例数
+     */
+    @NotNull(message = "最小活跃实例数不能为空")
+    @Min(value = 1, message = "最小活跃实例数不能小于1")
+    @Max(value = 50, message = "最小活跃实例数不能大于50")
+    private Integer minActiveInstance;
+
+    /**
+     * 最大活跃实例数
+     */
+    @NotNull(message = "最大活跃实例数不能为空")
+    @Min(value = 1, message = "最大活跃实例数不能小于1")
+    @Max(value = 100, message = "最大活跃实例数不能大于100")
+    private Integer maxActiveInstance;
+}

+ 118 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/model/InstanceManagementResponse.java

@@ -0,0 +1,118 @@
+
+package cn.com.yusys.manager.model;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+
+/**
+ * 实例管理响应实体
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class InstanceManagementResponse {
+    /**
+     * 响应码
+     */
+    private Integer code;
+
+    /**
+     * 响应消息
+     */
+    private String message;
+
+
+    private InstanceManagementData data;
+
+    /**
+     * 实例详情
+     */
+    @Data
+    @Builder
+    @NoArgsConstructor
+    @AllArgsConstructor
+    public static class InstanceDetail {
+        /**
+         * 容器ID
+         */
+        private String containerId;
+
+        /**
+         * 实例IP
+         */
+        private String ip;
+
+        /**
+         * 实例端口
+         */
+        private Integer port;
+
+        /**
+         * 实例状态 (0-空闲 1-运行中 2-异常)
+         */
+        private Integer status;
+
+        /**
+         * CPU使用率
+         */
+        private Double cpuUsage;
+
+        /**
+         * 内存使用率
+         */
+        private Double memoryUsage;
+
+        /**
+         * GPU使用率
+         */
+        private Double gpuUsage;
+
+        /**
+         * GPU内存使用量
+         */
+        private Double gpuMemory;
+
+        /**
+         * 上次心跳时间
+         */
+        private Long lastHeartbeatTime;
+    }
+
+    /**
+     * 创建成功响应
+     */
+    public static InstanceManagementResponse success(InstanceManagementData data) {
+        return InstanceManagementResponse.builder()
+                .code(200)
+                .message("操作成功")
+                .data(data)
+                .build();
+    }
+
+    /**
+     * 创建失败响应
+     */
+    public static InstanceManagementResponse fail(String message) {
+        return InstanceManagementResponse.builder()
+                .code(500)
+                .message(message)
+                .build();
+    }
+
+    @Data
+    @Builder
+    @NoArgsConstructor
+    @AllArgsConstructor
+    public static class InstanceManagementData {
+        private Integer totalInstances;      // 总实例数
+        private Integer idleInstances;       // 空闲实例数
+        private Integer runningInstances;    // 运行中实例数
+        private Integer errorInstances;      // 异常实例数
+        private List<InstanceDetail> instanceDetails; // 实例详情列表
+    }
+}

+ 6 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/model/Task.java

@@ -13,4 +13,10 @@ public class Task {
      * 文件路径,用于指定任务关联的文件位置
      */
     private String filePath;
+
+
+    /**
+     * 任务ID,用于存储任务ID
+     */
+    private String taskId;
 }

+ 60 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/model/TaskLogResponse.java

@@ -0,0 +1,60 @@
+
+package cn.com.yusys.manager.model;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+
+/**
+ * 任务日志响应实体
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class TaskLogResponse {
+    /**
+     * 响应码
+     */
+    private Integer code;
+
+    /**
+     * 响应消息
+     */
+    private String message;
+
+    /**
+     * 任务ID
+     */
+    private String taskId;
+
+    /**
+     * 日志内容列表
+     */
+    private List<String> logs;
+
+    /**
+     * 创建成功响应
+     */
+    public static TaskLogResponse success(String taskId, List<String> logs) {
+        return TaskLogResponse.builder()
+                .code(200)
+                .message("查询成功")
+                .taskId(taskId)
+                .logs(logs)
+                .build();
+    }
+
+    /**
+     * 创建失败响应
+     */
+    public static TaskLogResponse fail(String message) {
+        return TaskLogResponse.builder()
+                .code(500)
+                .message(message)
+                .build();
+    }
+}

+ 111 - 6
schedule-manager/src/main/java/cn/com/yusys/manager/service/InstanceMonitorService.java

@@ -7,6 +7,8 @@ import cn.com.yusys.manager.model.InstanceStatus;
 import cn.com.yusys.manager.model.InstanceStatusResponse;
 import cn.com.yusys.manager.instanceManager.Impl.DockerInstanceManager;
 import cn.com.yusys.manager.model.Task;
+import cn.com.yusys.manager.model.InstanceManagementResponse;
+import cn.com.yusys.manager.model.InstanceConfigRequest;
 import cn.com.yusys.manager.util.ParseInstanceClient;
 import cn.com.yusys.manager.common.PortPool;
 import lombok.RequiredArgsConstructor;
@@ -51,6 +53,10 @@ public class InstanceMonitorService {
     @Resource
     private PortPool portPool;
 
+    // 注入任务日志服务
+    @Resource
+    private TaskLogService taskLogService;
+
     // 最大重试次数
     @org.springframework.beans.factory.annotation.Value("${parser.task.max-retry:3}")
     private int maxRetry;
@@ -399,19 +405,38 @@ public class InstanceMonitorService {
      */
    public ExecuteResponse processMultimodalTask(Task task) {
         Map<String, InstanceStatus> activeInstancePool = instancestatusRegistry.getActiveInstancePool();
+        String taskId = task.getTaskId();
+        
         try {
+            // 记录任务开始日志
+            taskLogService.logTaskStart(taskId, task.getFilePath());
+            
             //  检查是否有空闲的解析实例
             InstanceStatus idleInstance = findIdleInstance(activeInstancePool);
             if (idleInstance == null) {
                 log.debug("当前无空闲解析实例");
+                taskLogService.logTaskFailure(taskId, "当前无空闲解析实例");
                 return ExecuteResponse.fail(300,"当前无空闲解析实例");
             }
+            
+            // 记录实例分配日志
+            taskLogService.logInstanceAllocation(taskId, idleInstance.getContainerId());
 
             //  执行任务解析
-            return executeTaskWithRetry(idleInstance, task.getFilePath());
+            ExecuteResponse response = executeTaskWithRetry(idleInstance, task);
+            
+            // 记录任务完成日志
+            if (response != null && response.getCode() == 200) {
+                taskLogService.logTaskComplete(taskId, response.getMessage());
+            } else {
+                taskLogService.logTaskFailure(taskId, response != null ? response.getMessage() : "未知错误");
+            }
+            
+            return response;
 
         } catch (Exception e) {
             log.error("多模态任务解析定时任务执行失败", e);
+            taskLogService.logTaskFailure(taskId, e.getMessage());
             return ExecuteResponse.fail("多模态任务解析定时任务执行失败");
         }
     }
@@ -430,7 +455,7 @@ public class InstanceMonitorService {
     /**
      * 执行任务并处理重试逻辑
      */
-    private ExecuteResponse executeTaskWithRetry(InstanceStatus instance, String taskMessage) {
+    private ExecuteResponse executeTaskWithRetry(InstanceStatus instance, Task task) {
         String instanceId = instance.getContainerId();
         int retryCount = 0;
         // 标记实例为运行中
@@ -440,10 +465,10 @@ public class InstanceMonitorService {
             while (retryCount <= maxRetry ) {
                 try {
                     log.info("开始执行任务,实例:{},重试次数:{}/{},任务内容:{}", 
-                            instanceId, retryCount, maxRetry, taskMessage);
+                            instanceId, retryCount, maxRetry, task.getFilePath());
 
                     // 调用解析器执行任务
-                    ExecuteResponse response = callParser(instance, taskMessage);
+                    ExecuteResponse response = callParser(instance, task);
 
                     if (response != null && response.getCode() == 200) {
                         log.info("任务执行成功,实例:{},响应:{}", instanceId, response);
@@ -469,12 +494,12 @@ public class InstanceMonitorService {
     /**
      * 调用解析器执行任务
      */
-    private ExecuteResponse callParser(InstanceStatus instance, String taskMessage) {
+    private ExecuteResponse callParser(InstanceStatus instance, Task task) {
         try {
             ExecuteResponse response = instanceClient.executeTask(
                     instance.getIp(), 
                     instance.getPort(), 
-                    taskMessage);
+                    task);
 
             if (response == null || response.getCode() != 200) {
                 log.warn("调用解析器返回失败,实例:{},响应:{}", instance.getContainerId(), response);
@@ -486,4 +511,84 @@ public class InstanceMonitorService {
         }
     }
 
+    /**
+     * 获取实例管理信息
+     * @return 实例管理信息
+     */
+    public InstanceManagementResponse getInstanceManagementInfo() {
+        try {
+            Map<String, InstanceStatus> activeInstancePool = instancestatusRegistry.getActiveInstancePool();
+            
+            // 统计各状态实例数量
+            long idleCount = activeInstancePool.values().stream()
+                    .filter(status -> status.getStatus() == 0)
+                    .count();
+            long runningCount = activeInstancePool.values().stream()
+                    .filter(status -> status.getStatus() == 1)
+                    .count();
+            long errorCount = activeInstancePool.values().stream()
+                    .filter(status -> status.getStatus() == 2)
+                    .count();
+            
+            // 构建实例详情列表
+            List<InstanceManagementResponse.InstanceDetail> instanceDetails = activeInstancePool.values().stream()
+                    .map(status -> InstanceManagementResponse.InstanceDetail.builder()
+                            .containerId(status.getContainerId())
+                            .ip(status.getIp())
+                            .port(status.getPort())
+                            .status(status.getStatus())
+                            .cpuUsage(status.getCpuUsage())
+                            .memoryUsage(status.getMemoryUsage())
+                            .gpuUsage(status.getGpuUsage())
+                            .gpuMemory(status.getGpuMemory())
+                            .lastHeartbeatTime(status.getLastHeartbeatTime())
+                            .build())
+                    .collect(Collectors.toList());
+            
+            // 构建响应数据
+            InstanceManagementResponse.InstanceManagementData data = InstanceManagementResponse.InstanceManagementData.builder()
+                    .totalInstances(activeInstancePool.size())
+                    .idleInstances((int) idleCount)
+                    .runningInstances((int) runningCount)
+                    .errorInstances((int) errorCount)
+                    .instanceDetails(instanceDetails)
+                    .build();
+            
+            return InstanceManagementResponse.success(data);
+        } catch (Exception e) {
+            log.error("获取实例管理信息失败", e);
+            return InstanceManagementResponse.fail("获取实例管理信息失败: " + e.getMessage());
+        }
+    }
+    
+    /**
+     * 更新实例配置
+     * @param request 实例配置请求
+     * @return 操作结果
+     */
+    public InstanceManagementResponse updateInstanceConfig(InstanceConfigRequest request) {
+        try {
+            int minActiveInstance = request.getMinActiveInstance();
+            int maxActiveInstance = request.getMaxActiveInstance();
+            
+            // 校验最小实例数不能大于最大实例数
+            if (minActiveInstance > maxActiveInstance) {
+                return InstanceManagementResponse.fail("最小活跃实例数不能大于最大活跃实例数");
+            }
+            
+            // 更新配置
+            parserConfig.MIN_ACTIVE_INSTANCE = minActiveInstance;
+            parserConfig.MAX_ACTIVE_INSTANCE = maxActiveInstance;
+            
+            log.info("实例配置已更新:最小活跃实例数={}, 最大活跃实例数={}", 
+                    minActiveInstance, maxActiveInstance);
+            
+            // 获取更新后的实例管理信息
+            return getInstanceManagementInfo();
+        } catch (Exception e) {
+            log.error("更新实例配置失败", e);
+            return InstanceManagementResponse.fail("更新实例配置失败: " + e.getMessage());
+        }
+    }
+
 }

+ 177 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/service/TaskLogService.java

@@ -0,0 +1,177 @@
+
+package cn.com.yusys.manager.service;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * 任务日志服务
+ * 负责管理任务的生命周期日志记录和查询
+ */
+@Slf4j
+@Service
+public class TaskLogService {
+
+    // 日志存储目录
+    @Value("${task.log.directory:./logs/tasks}")
+    private String logDirectory;
+
+    // 日志文件后缀
+    private static final String LOG_FILE_SUFFIX = ".log";
+
+    // 日期时间格式
+    private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
+
+    /**
+     * 记录任务日志
+     * @param taskId 任务ID
+     * @param stage 任务阶段
+     * @param message 日志消息
+     */
+    public void logTask(String taskId, String stage, String message) {
+        try {
+            // 确保日志目录存在
+            ensureLogDirectoryExists();
+
+            // 构建日志文件路径
+            String logFilePath = getLogFilePath(taskId);
+
+            // 构建日志内容
+            String logEntry = String.format("[%s] [%s] [%s] %s%n",
+                    LocalDateTime.now().format(DATE_TIME_FORMATTER),
+                    stage,
+                    taskId,
+                    message);
+
+            // 追加写入日志文件
+            appendToFile(logFilePath, logEntry);
+
+            log.info("任务日志已记录:taskId={}, stage={}, message={}", taskId, stage, message);
+        } catch (Exception e) {
+            log.error("记录任务日志失败:taskId={}, stage={}, message={}", taskId, stage, message, e);
+        }
+    }
+
+    /**
+     * 记录任务开始日志
+     * @param taskId 任务ID
+     * @param filePath 文件路径
+     */
+    public void logTaskStart(String taskId, String filePath) {
+        logTask(taskId, "TASK_START", String.format("任务开始执行,文件路径:%s", filePath));
+    }
+
+    /**
+     * 记录任务完成日志
+     * @param taskId 任务ID
+     * @param result 执行结果
+     */
+    public void logTaskComplete(String taskId, String result) {
+        logTask(taskId, "TASK_COMPLETE", String.format("任务执行完成,结果:%s", result));
+    }
+
+    /**
+     * 记录任务失败日志
+     * @param taskId 任务ID
+     * @param errorMessage 错误消息
+     */
+    public void logTaskFailure(String taskId, String errorMessage) {
+        logTask(taskId, "TASK_FAILURE", String.format("任务执行失败:%s", errorMessage));
+    }
+
+    /**
+     * 记录实例分配日志
+     * @param taskId 任务ID
+     * @param instanceId 实例ID
+     */
+    public void logInstanceAllocation(String taskId, String instanceId) {
+        logTask(taskId, "INSTANCE_ALLOCATION", String.format("分配解析实例:%s", instanceId));
+    }
+
+    /**
+     * 记录实例释放日志
+     * @param taskId 任务ID
+     * @param instanceId 实例ID
+     */
+    public void logInstanceRelease(String taskId, String instanceId) {
+        logTask(taskId, "INSTANCE_RELEASE", String.format("释放解析实例:%s", instanceId));
+    }
+
+    /**
+     * 记录解析器调用日志
+     * @param taskId 任务ID
+     * @param instanceId 实例ID
+     * @param message 消息
+     */
+    public void logParserCall(String taskId, String instanceId, String message) {
+        logTask(taskId, "PARSER_CALL", String.format("调用解析器,实例ID:%s,消息:%s", instanceId, message));
+    }
+
+    /**
+     * 查询任务日志
+     * @param taskId 任务ID
+     * @return 日志内容列表
+     */
+    public List<String> getTaskLogs(String taskId) {
+        try {
+            String logFilePath = getLogFilePath(taskId);
+            Path path = Paths.get(logFilePath);
+
+            if (!Files.exists(path)) {
+                log.warn("任务日志文件不存在:{}", logFilePath);
+                return new ArrayList<>();
+            }
+
+            return Files.readAllLines(path);
+        } catch (Exception e) {
+            log.error("查询任务日志失败:taskId={}", taskId, e);
+            return new ArrayList<>();
+        }
+    }
+
+    /**
+     * 获取任务日志文件路径
+     * @param taskId 任务ID
+     * @return 日志文件路径
+     */
+    private String getLogFilePath(String taskId) {
+        return logDirectory + File.separator + taskId + LOG_FILE_SUFFIX;
+    }
+
+    /**
+     * 确保日志目录存在
+     * @throws IOException 如果创建目录失败
+     */
+    private void ensureLogDirectoryExists() throws IOException {
+        Path path = Paths.get(logDirectory);
+        if (!Files.exists(path)) {
+            Files.createDirectories(path);
+            log.info("创建任务日志目录:{}", logDirectory);
+        }
+    }
+
+    /**
+     * 追加内容到文件
+     * @param filePath 文件路径
+     * @param content 内容
+     * @throws IOException 如果写入文件失败
+     */
+    private void appendToFile(String filePath, String content) throws IOException {
+        try (PrintWriter writer = new PrintWriter(new FileWriter(filePath, true))) {
+            writer.print(content);
+        }
+    }
+}

+ 6 - 8
schedule-manager/src/main/java/cn/com/yusys/manager/util/ParseInstanceClient.java

@@ -3,6 +3,7 @@ package cn.com.yusys.manager.util;
 import cn.com.yusys.manager.config.ParserConfig;
 import cn.com.yusys.manager.model.ExecuteResponse;
 import cn.com.yusys.manager.model.InstanceStatusResponse;
+import cn.com.yusys.manager.model.Task;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.MediaType;
@@ -111,22 +112,19 @@ public class ParseInstanceClient {
      * @return 执行结果响应
      */
     public ExecuteResponse executeTask(
-            String instanceIp, Integer instancePort, String taskData) {
+            String instanceIp, Integer instancePort, Task taskData) {
         // 1. 参数校验
         if (instanceIp == null || instancePort == null) {
             log.warn("实例IP或端口为空,跳过任务执行");
             return ExecuteResponse.fail("实例IP或端口为空");
         }
 
+        String executeUrl = String.format("http://%s:%d%s", instanceIp, instancePort,"/execute");
         try {
             ExecuteResponse response = webClient.post()
-                    .uri(uriBuilder -> uriBuilder
-                            .scheme("http")
-                            .host(instanceIp)
-                            .port(instancePort)
-                            .path("/execute")
-                            .queryParam("file_path", taskData)
-                            .build())
+                    .uri(executeUrl)
+                    .contentType(MediaType.APPLICATION_JSON)
+                    .bodyValue(taskData)
                     .retrieve()
                     .onStatus(HttpStatus::isError, clientResponse ->
                             Mono.error(new WebClientResponseException(