Просмотр исходного кода

增加模块之间调用的接口

zsh 3 недель назад
Родитель
Сommit
4a3994c9ef

+ 5 - 0
schedule-consumer/pom.xml

@@ -37,6 +37,11 @@
             <groupId>org.projectlombok</groupId>
             <artifactId>lombok</artifactId>
         </dependency>
+        <!-- Spring WebFlux(WebClient) -->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-webflux</artifactId>
+        </dependency>
     </dependencies>
 
     <build>

+ 1 - 1
schedule-consumer/src/main/java/cn/com/yusys/consumer/config/KafkaConsumerConfig.java

@@ -85,7 +85,7 @@ public class KafkaConsumerConfig {
         ConcurrentKafkaListenerContainerFactory<String, String> factory =
                 new ConcurrentKafkaListenerContainerFactory<>();
         factory.setConsumerFactory(consumerFactory());
-        factory.setConcurrency(3);
+        factory.setConcurrency(1);
 
         // 【核心】注入错误处理器
         factory.setCommonErrorHandler(errorHandler);

+ 39 - 6
schedule-consumer/src/main/java/cn/com/yusys/consumer/listener/MessageListener.java

@@ -1,8 +1,12 @@
 package cn.com.yusys.consumer.listener;
 
+import cn.com.yusys.consumer.model.Task;
+import cn.com.yusys.consumer.util.ParseServiceClient;
+import cn.com.yusys.consumer.util.response.ExecuteResponse;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.kafka.support.Acknowledgment;
@@ -12,6 +16,9 @@ import org.springframework.stereotype.Component;
 public class MessageListener {
     private static final Logger log = LoggerFactory.getLogger(MessageListener.class);
 
+    @Autowired
+    private ParseServiceClient parseServiceClient;
+
     // 从配置读取逗号分隔的 Topic 字符串
     @Value("${kafka.topics.listen}")
     private String topicsConfig;
@@ -26,19 +33,45 @@ public class MessageListener {
         log.info("=== [CONSUMER] 收到消息 | Topic: {} | Key: {} | Msg: {} | Offset: {} ===",
                 record.topic(), record.key(), message, record.offset());
 
-        processBusinessLogic(message);
-        acknowledgment.acknowledge();
+        ExecuteResponse executeResponse = processBusinessLogic(message);
+
+        if (executeResponse != null && executeResponse.getCode()==200) {
+            log.info("=== [CONSUMER] 任务处理成功 ===");
+            acknowledgment.acknowledge();
+
+        }else{
+            log.info("=== [CONSUMER] 任务处理失败===");
+            try {
+                // 任务处理失败时睡眠5秒
+                Thread.sleep(5000);
+            } catch (InterruptedException e) {
+                // 捕获中断异常,恢复线程中断状态
+                log.error("=== [CONSUMER] sleep过程中线程被中断 ===", e);
+                Thread.currentThread().interrupt(); // 重置中断标记
+            }
+            log.info("=== [CONSUMER] sleep结束,继续处理后续任务 ===");
+
+        }
+
 
     }
 
-    private void processBusinessLogic(String message) {
+    private ExecuteResponse processBusinessLogic(String message) {
         if ("error".equals(message)) {
             throw new RuntimeException("Simulated Business Exception");
         }
+        
         try {
-            Thread.sleep(500);
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
+            // 调用ParseServiceClient执行任务
+            log.info("开始调用execute接口执行任务,消息内容:{}", message);
+            Task task=new Task();
+            task.setFilePath(message);
+            ExecuteResponse response = parseServiceClient.executeTask(task);
+            log.info("任务执行完成,响应:{}", response);
+            return response;
+        } catch (Exception e) {
+            log.error("调用execute接口执行任务失败", e);
+            throw new RuntimeException("任务执行失败", e);
         }
     }
 }

+ 15 - 0
schedule-consumer/src/main/java/cn/com/yusys/consumer/model/Task.java

@@ -0,0 +1,15 @@
+package cn.com.yusys.consumer.model;
+
+import lombok.Data;
+
+/**
+ * 任务类,用于存储任务相关信息
+ */
+@Data
+public class Task {
+
+    /**
+     * 文件路径,用于存储任务关联的文件路径信息
+     */
+    private String filePath;
+}

+ 85 - 0
schedule-consumer/src/main/java/cn/com/yusys/consumer/util/ParseServiceClient.java

@@ -0,0 +1,85 @@
+package cn.com.yusys.consumer.util;
+
+import cn.com.yusys.consumer.model.Task;
+import cn.com.yusys.consumer.util.response.ExecuteResponse;
+import cn.com.yusys.consumer.util.response.InstanceStatusResponse;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.stereotype.Component;
+import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
+import reactor.core.publisher.Mono;
+
+/**
+ * 解析服务实例实例接口调用工具
+ */
+@Slf4j
+@Component
+public class ParseServiceClient {
+
+
+    private final WebClient webClient;
+
+    // 通过构造函数注入WebClient.Builder,利用Spring自动配置
+    public ParseServiceClient(WebClient.Builder webClientBuilder) {
+        this.webClient = webClientBuilder
+                .codecs(config -> config.defaultCodecs().maxInMemorySize(1024 * 1024))
+                .build();
+    }
+
+    /**
+     * 调用/execute接口执行任务
+     * @param taskData 任务数据
+     * @return 执行结果响应
+     */
+    public ExecuteResponse executeTask(Task taskData) {
+        String executeUrl = "http://127.0.0.1:8083//api/manager/parse";
+        try {
+            ExecuteResponse response = webClient.post()
+                    .uri(executeUrl)
+                    .contentType(MediaType.APPLICATION_JSON)
+                    .bodyValue(taskData)
+                    .retrieve()
+                    .onStatus(HttpStatus::isError, clientResponse ->
+                            Mono.error(new WebClientResponseException(
+                                    "执行接口返回异常状态码",
+                                    clientResponse.statusCode().value(),
+                                    clientResponse.statusCode().getReasonPhrase(),
+                                    null, null, null)))
+                    .bodyToMono(ExecuteResponse.class)
+                    .timeout(java.time.Duration.ofSeconds(30))
+                    .block();
+
+            if (response != null && 200 == response.getCode()) {
+                log.debug("任务执行成功,响应:{}", response);
+                return response;
+            } else {
+                log.warn("任务执行返回失败,响应:{}", response);
+                return createErrorResponse("接口返回非200响应");
+            }
+        } catch (WebClientResponseException e) {
+            log.error("任务执行失败,HTTP状态码:{}", e.getRawStatusCode(), e);
+            return createErrorResponse(String.format("HTTP请求失败,状态码:%d", e.getRawStatusCode()));
+        } catch (Exception e) {
+            log.error("任务执行异常", e);
+            return createErrorResponse(e.getMessage());
+        }
+    }
+
+
+
+    /**
+     * 创建错误响应
+     * @param errorMessage 错误消息
+     * @return ExecuteResponse 错误响应
+     */
+    private ExecuteResponse createErrorResponse(String errorMessage) {
+        ExecuteResponse response = new ExecuteResponse();
+        response.setCode(500);
+        response.setMessage(errorMessage);
+        return response;
+    }
+
+
+}

+ 24 - 0
schedule-consumer/src/main/java/cn/com/yusys/consumer/util/response/ExecuteResponse.java

@@ -0,0 +1,24 @@
+package cn.com.yusys.consumer.util.response;
+
+import lombok.Data;
+
+/**
+ * 执行任务响应类
+ */
+@Data
+public class ExecuteResponse {
+    /**
+     * 响应码
+     */
+    private Integer code;
+
+    /**
+     * 响应消息
+     */
+    private String message;
+
+    /**
+     * 响应数据
+     */
+    private Object data;
+}

+ 40 - 0
schedule-consumer/src/main/java/cn/com/yusys/consumer/util/response/InstanceStatusResponse.java

@@ -0,0 +1,40 @@
+package cn.com.yusys.consumer.util.response;
+
+import lombok.Data;
+
+/**
+ * 实例状态响应类
+ */
+@Data
+public class InstanceStatusResponse {
+    /**
+     * 响应码
+     */
+    private Integer code;
+
+    /**
+     * 响应消息
+     */
+    private String message;
+
+    /**
+     * 状态数据
+     */
+    private StatusData data;
+
+    /**
+     * 状态数据内部类
+     */
+    @Data
+    public static class StatusData {
+        /**
+         * 状态
+         */
+        private String status;
+
+        /**
+         * 其他状态信息
+         */
+        private Object info;
+    }
+}

+ 1 - 1
schedule-consumer/src/main/resources/application.yml

@@ -16,4 +16,4 @@ spring:
 # 可选:在配置中定义 Topic 列表,方便管理
 kafka:
   topics:
-    listen: test-topic
+    listen: schedule-topic

+ 31 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/controller/ManagerController.java

@@ -0,0 +1,31 @@
+package cn.com.yusys.manager.controller;
+
+import cn.com.yusys.manager.model.ExecuteResponse;
+import cn.com.yusys.manager.model.Task;
+import cn.com.yusys.manager.service.InstanceMonitorService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * 调度管理控制器
+ */
+@RestController
+@RequestMapping("/api/manager")
+public class ManagerController {
+
+    @Autowired
+    private InstanceMonitorService parserService;
+
+    /**
+     * 接收任务调用,解析实例并执行解析任务
+     * @param request 任务请求对象
+     * @return 任务执行结果
+     */
+    @PostMapping("/parse")
+    public ExecuteResponse executeParseTask(@RequestBody Task request) {
+        return parserService.processMultimodalTask(request);
+    }
+}

+ 32 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/model/ExecuteResponse.java

@@ -0,0 +1,32 @@
+package cn.com.yusys.manager.model;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.RequiredArgsConstructor;
+
+/**
+ * 执行结果响应类
+ * 用于封装API接口的返回结果,包含状态码、消息和数据
+ */
+@Data
+@AllArgsConstructor
+@RequiredArgsConstructor
+public  class ExecuteResponse {  // 执行结果响应类
+    private Integer code;  // 响应状态码,通常用于表示请求处理结果的状态
+    private String message;  // 响应消息,通常用于描述请求处理结果的信息
+    private Object data;  // 响应数据,用于承载请求处理后的返回数据
+
+
+
+    public static ExecuteResponse success(Object data) {
+        return new ExecuteResponse(200, "success", data);
+    }
+    public static ExecuteResponse fail(String message) {
+        return new ExecuteResponse(500, message, null);
+    }
+
+    public static ExecuteResponse fail(Integer code,String message) {
+        return new ExecuteResponse(code, message, null);
+    }
+
+}

+ 16 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/model/Task.java

@@ -0,0 +1,16 @@
+package cn.com.yusys.manager.model;
+
+
+import lombok.Data;
+
+/**
+ * Task类,表示一个任务对象
+ */
+@Data
+public class Task {
+
+    /**
+     * 文件路径,用于指定任务关联的文件位置
+     */
+    private String filePath;
+}

+ 103 - 4
schedule-manager/src/main/java/cn/com/yusys/manager/service/InstanceMonitorService.java

@@ -2,11 +2,13 @@ package cn.com.yusys.manager.service;
 
 import cn.com.yusys.manager.common.ParseInstanceStatusRegistry;
 import cn.com.yusys.manager.config.ParserConfig;
+import cn.com.yusys.manager.model.ExecuteResponse;
 import cn.com.yusys.manager.model.InstanceStatus;
 import cn.com.yusys.manager.model.InstanceStatusResponse;
 import cn.com.yusys.manager.instanceManager.Impl.DockerInstanceManager;
+import cn.com.yusys.manager.model.Task;
 import cn.com.yusys.manager.util.ParseInstanceClient;
-import cn.com.yusys.manager.common.PortPool; 
+import cn.com.yusys.manager.common.PortPool;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.scheduling.annotation.Scheduled;
@@ -18,7 +20,6 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 /**
@@ -50,6 +51,10 @@ public class InstanceMonitorService {
     @Resource
     private PortPool portPool;
 
+    // 最大重试次数
+    @org.springframework.beans.factory.annotation.Value("${parser.task.max-retry:3}")
+    private int maxRetry;
+
     @PostConstruct
     public void initParseInstance(){
         log.info("开始初始化解析实例...");
@@ -77,9 +82,9 @@ public class InstanceMonitorService {
     /**
      * 核心监控定时任务:
      * - initialDelay = 30000:首次执行延迟30秒
-     * - fixedRate = 5000:之后每5秒执行一次
+     * - fixedRate = 10000:之后每10秒执行一次
      */
-    @Scheduled(initialDelay = 30000, fixedRate = 5000)
+    @Scheduled(initialDelay = 30000, fixedRate = 10000)
     public void parserInstanceMonitor() {
         Map<String, InstanceStatus> activeInstancePool = instancestatusRegistry.getActiveInstancePool();
         try {
@@ -387,4 +392,98 @@ public class InstanceMonitorService {
             }
         }
     }
+
+    /**
+     * 执行多模态任务解析任务,阻塞调用底层解析器执行任务
+     * 失败任务重试maxRetry次后,转入失败Topic
+     */
+   public ExecuteResponse processMultimodalTask(Task task) {
+        Map<String, InstanceStatus> activeInstancePool = instancestatusRegistry.getActiveInstancePool();
+        try {
+            //  检查是否有空闲的解析实例
+            InstanceStatus idleInstance = findIdleInstance(activeInstancePool);
+            if (idleInstance == null) {
+                log.debug("当前无空闲解析实例");
+                return ExecuteResponse.fail(300,"当前无空闲解析实例");
+            }
+
+            //  执行任务解析
+            return executeTaskWithRetry(idleInstance, task.getFilePath());
+
+        } catch (Exception e) {
+            log.error("多模态任务解析定时任务执行失败", e);
+            return ExecuteResponse.fail("多模态任务解析定时任务执行失败");
+        }
+    }
+
+    /**
+     * 查找空闲的解析实例
+     */
+    private InstanceStatus findIdleInstance(Map<String, InstanceStatus> activeInstancePool) {
+        return activeInstancePool.values().stream()
+                .filter(status -> status.getStatus() == 0) // 状态为0表示空闲
+                .findFirst()
+                .orElse(null);
+    }
+
+
+    /**
+     * 执行任务并处理重试逻辑
+     */
+    private ExecuteResponse executeTaskWithRetry(InstanceStatus instance, String taskMessage) {
+        String instanceId = instance.getContainerId();
+        int retryCount = 0;
+        // 标记实例为运行中
+        instance.setStatus(1);
+
+        try {
+            while (retryCount <= maxRetry ) {
+                try {
+                    log.info("开始执行任务,实例:{},重试次数:{}/{},任务内容:{}", 
+                            instanceId, retryCount, maxRetry, taskMessage);
+
+                    // 调用解析器执行任务
+                    ExecuteResponse response = callParser(instance, taskMessage);
+
+                    if (response != null && response.getCode() == 200) {
+                        log.info("任务执行成功,实例:{},响应:{}", instanceId, response);
+                        return response;
+                    } else {
+                        log.warn("任务执行返回失败,实例:{},响应:{},准备重试", instanceId, response);
+                        retryCount++;
+                    }
+                } catch (Exception e) {
+                    log.error("任务执行异常,实例:{},重试次数:{}/{}", instanceId, retryCount, maxRetry, e);
+                    retryCount++;
+                }
+
+            }
+
+            return ExecuteResponse.fail("任务执行失败,已达最大重试次数");
+        } finally {
+            // 恢复实例状态为空闲
+            instance.setStatus(0);
+        }
+    }
+
+    /**
+     * 调用解析器执行任务
+     */
+    private ExecuteResponse callParser(InstanceStatus instance, String taskMessage) {
+        try {
+            ExecuteResponse response = instanceClient.executeTask(
+                    instance.getIp(), 
+                    instance.getPort(), 
+                    taskMessage);
+
+            if (response == null || response.getCode() != 200) {
+                log.warn("调用解析器返回失败,实例:{},响应:{}", instance.getContainerId(), response);
+            }
+            return response;
+        } catch (Exception e) {
+            log.error("调用解析器失败,实例:{}", instance.getContainerId(), e);
+            throw e;
+        }
+    }
+
 }

+ 55 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/util/ParseInstanceClient.java

@@ -1,6 +1,7 @@
 package cn.com.yusys.manager.util;
 
 import cn.com.yusys.manager.config.ParserConfig;
+import cn.com.yusys.manager.model.ExecuteResponse;
 import cn.com.yusys.manager.model.InstanceStatusResponse;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.http.HttpStatus;
@@ -101,4 +102,58 @@ public class ParseInstanceClient {
 
         return errorResponse;
     }
+
+    /**
+     * 调用Python实例的/execute接口执行任务
+     * @param instanceIp 实例IP
+     * @param instancePort 实例端口
+     * @param taskData 任务数据
+     * @return 执行结果响应
+     */
+    public ExecuteResponse executeTask(
+            String instanceIp, Integer instancePort, String taskData) {
+        // 1. 参数校验
+        if (instanceIp == null || instancePort == null) {
+            log.warn("实例IP或端口为空,跳过任务执行");
+            return ExecuteResponse.fail("实例IP或端口为空");
+        }
+
+        try {
+            ExecuteResponse response = webClient.post()
+                    .uri(uriBuilder -> uriBuilder
+                            .scheme("http")
+                            .host(instanceIp)
+                            .port(instancePort)
+                            .path("/execute")
+                            .queryParam("file_path", taskData)
+                            .build())
+                    .retrieve()
+                    .onStatus(HttpStatus::isError, clientResponse ->
+                            Mono.error(new WebClientResponseException(
+                                    "执行接口返回异常状态码",
+                                    clientResponse.statusCode().value(),
+                                    clientResponse.statusCode().getReasonPhrase(),
+                                    null, null, null)))
+                    .bodyToMono(ExecuteResponse.class)
+                    .timeout(java.time.Duration.ofSeconds(30))
+                    .block();
+
+            // 2. 响应校验
+            if (response != null && 200 == response.getCode()) {
+                log.debug("实例{}:{}任务执行成功", instanceIp, instancePort);
+                return response;
+            } else {
+                log.warn("实例{}:{}任务执行返回失败,响应:{}", instanceIp, instancePort, response);
+                return ExecuteResponse.fail("接口返回非200响应");
+            }
+        } catch (WebClientResponseException e) {
+            log.error("实例{}:{}任务执行失败,HTTP状态码:{}", instanceIp, instancePort, e.getRawStatusCode(), e);
+            return ExecuteResponse.fail(String.format("HTTP请求失败,状态码:%d", e.getRawStatusCode()));
+        } catch (Exception e) {
+            log.error("实例{}:{}任务执行异常", instanceIp, instancePort, e);
+            return ExecuteResponse.fail(e.getMessage());
+        }
+    }
+
+
 }

+ 3 - 1
schedule-manager/src/main/resources/application.yml

@@ -17,6 +17,8 @@ spring:
 kafka:
   topics:
     listen: test-topic
+    task: task-topic
+    failed: task-failed
 
 docker:
   host: tcp://127.0.0.1:2375 # Docker Daemon地址
@@ -34,7 +36,7 @@ parser:
     min-active-instance: 3
 
     # 最大活跃实例数,资源上限
-    max-active-instance: 10
+    max-active-instance: 4
 
     # 任务积压阈值,触发临时扩容的 Kafka Lag 阈值 (> 100)
     task-backlog-threshold: 100

+ 1 - 1
schedule-producer/src/main/resources/application.yml

@@ -12,7 +12,7 @@ spring:
       acks: all
       retries: 3
     topics:
-      test-topic:
+      schedule-topic:
         partitions: 6