2 Комити a5302676af ... 2786becd3f

Аутор SHA1 Порука Датум
  zsh 2786becd3f 增加部署文档 пре 1 недеља
  zsh cfdcd20469 增加项目部署文档 пре 1 недеља
19 измењених фајлова са 185 додато и 58 уклоњено
  1. 18 4
      README.md
  2. 1 0
      schedule-consumer/src/main/java/cn/com/yusys/consumer/ConsumerApplication.java
  3. 16 1
      schedule-consumer/src/main/java/cn/com/yusys/consumer/config/KafkaConsumerConfig.java
  4. 24 7
      schedule-consumer/src/main/java/cn/com/yusys/consumer/listener/MessageListener.java
  5. 9 3
      schedule-consumer/src/main/java/cn/com/yusys/consumer/util/ParseServiceClient.java
  6. 1 1
      schedule-consumer/src/main/java/cn/com/yusys/consumer/util/response/ExecuteResponse.java
  7. 11 2
      schedule-consumer/src/main/resources/application.yml
  8. 1 0
      schedule-manager/parser/__init__.py
  9. 34 13
      schedule-manager/parser/parse_service.py
  10. 7 0
      schedule-manager/parser/pyproject.toml
  11. 1 1
      schedule-manager/parser/readme.md
  12. 7 0
      schedule-manager/parser/setup.py
  13. 3 0
      schedule-manager/src/main/java/cn/com/yusys/manager/config/ParserConfig.java
  14. 3 0
      schedule-manager/src/main/java/cn/com/yusys/manager/entity/TaskRecordEntity.java
  15. 1 1
      schedule-manager/src/main/java/cn/com/yusys/manager/instanceManager/Impl/ProcessInstanceManager.java
  16. 1 1
      schedule-manager/src/main/java/cn/com/yusys/manager/model/ExecuteResponse.java
  17. 28 12
      schedule-manager/src/main/java/cn/com/yusys/manager/service/InstanceMonitorService.java
  18. 4 4
      schedule-manager/src/main/java/cn/com/yusys/manager/util/ParseInstanceClient.java
  19. 15 8
      schedule-manager/src/main/resources/application.yml

+ 18 - 4
README.md

@@ -135,19 +135,33 @@ mvn clean package
 ```bash
 # 启动Producer
 java -jar schedule-producer/target/schedule-producer.jar
+nohup java -jar schedule-producer.jar > producer.log 2>&1 &
+
 
 # 启动Consumer
 java -jar schedule-consumer/target/schedule-consumer.jar
+nohup java -jar schedule-consumer-1.0-SNAPSHOT.jar \
+  --spring.config.location=./application.yml > consumer.log 2>&1 &
 
 # 启动Manager
 java -jar schedule-manager/target/schedule-manager.jar
-
+java -jar schedule-manager-1.0-SNAPSHOT.jar \
+  --spring.config.location=./application.yml   > manager.log 2>&1 &
 # 启动Monitor
 java -jar schedule-monitor/target/schedule-monitor.jar
 ```
-解析服务镜像打包:
-```bash
-docker build -t parse-service:latest  .
+解析服务部署:
+```
+启动manager
+在 /home/app/four-level-schedule/jar/schedule-manager/schedule-manager-1.0-SNAPSHOT.jar 目录下执行
+java -jar schedule-manager-1.0-SNAPSHOT.jar \
+  --spring.config.location=./application.yml   > manager.log 2>&1 &
+  
+  
+ 启动Consumer
+ 在/home/app/four-level-schedule/jar/schedule-consumer/schedule-consumer-1.0-SNAPSHOT.jar 执行
+ nohup java -jar schedule-consumer-1.0-SNAPSHOT.jar \
+  --spring.config.location=./application.yml > consumer.log 2>&1 &
 ```
 
 ## 测试指南

+ 1 - 0
schedule-consumer/src/main/java/cn/com/yusys/consumer/ConsumerApplication.java

@@ -7,5 +7,6 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
 public class ConsumerApplication {
     public static void main(String[] args) {
         SpringApplication.run(ConsumerApplication.class, args);
+
     }
 }

+ 16 - 1
schedule-consumer/src/main/java/cn/com/yusys/consumer/config/KafkaConsumerConfig.java

@@ -28,6 +28,14 @@ public class KafkaConsumerConfig {
     @Value("${spring.kafka.consumer.group-id}")
     private String groupId;
 
+    @Value("${spring.kafka.listener.concurrency:80}")
+    private Integer concurrency;
+
+    @Value("${spring.kafka.consumer.max-poll-records:30}")
+    private Integer maxPollRecords;
+
+    @Value("${spring.kafka.consumer.max-poll-interval-ms:600000}")
+    private Integer maxPollIntervalMs;
     // ===================== 死信队列生产者 =====================
     @Bean
     public ProducerFactory<String, String> dltProducerFactory() {
@@ -81,7 +89,14 @@ public class KafkaConsumerConfig {
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+        // 每次最多拉取 30 条
+        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
+
+        // 2. 最大处理间隔 10 分钟 (600000 毫秒)
+        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
 
+        // 可选:设置心跳间隔 (通常设为 max_poll_interval_ms 的 1/3)
+        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 20000);
         return new DefaultKafkaConsumerFactory<>(props);
     }
 
@@ -93,7 +108,7 @@ public class KafkaConsumerConfig {
         ConcurrentKafkaListenerContainerFactory<String, String> factory =
                 new ConcurrentKafkaListenerContainerFactory<>();
         factory.setConsumerFactory(consumerFactory());
-        factory.setConcurrency(24);
+        factory.setConcurrency(concurrency);
 
         // 注入错误处理器
         factory.setCommonErrorHandler(errorHandler);

+ 24 - 7
schedule-consumer/src/main/java/cn/com/yusys/consumer/listener/MessageListener.java

@@ -5,15 +5,18 @@ import cn.com.yusys.consumer.model.TaskRecordRequest;
 import cn.com.yusys.consumer.util.ParseServiceClient;
 import cn.com.yusys.consumer.util.TaskRecordClient;
 import cn.com.yusys.consumer.util.response.ExecuteResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.core.ConsumerFactory;
 import org.springframework.kafka.support.Acknowledgment;
 import org.springframework.stereotype.Component;
 
+import java.util.Map;
 import java.util.UUID;
 
 @Component
@@ -30,22 +33,36 @@ public class MessageListener {
     @Value("${kafka.topics.listen}")
     private String topicsConfig;
 
+    // 注入 ConsumerFactory
+    private final ConsumerFactory<?, ?> consumerFactory;
+
+    public MessageListener(ConsumerFactory<?, ?> consumerFactory) {
+        this.consumerFactory = consumerFactory;
+        Map<String, Object> configs = consumerFactory.getConfigurationProperties();
+
+        Object maxRecords = configs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG);
+        Object maxInterval = configs.get(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG);
+
+        log.info(">>> [配置验证] Factory 中的配置 -> max.poll.records: {}, max.poll.interval.ms: {}", maxRecords, maxInterval);
+    }
     /**
      * 监听多个 Topic
      * 注意:同一个 groupId 下的不同消费者实例会共同负载均衡消费这些 Topic 的所有分区。
      */
     @KafkaListener(   topics = "#{'${kafka.topics.listen}'.split(',')}",
-            groupId = "${spring.kafka.consumer.group-id}",    concurrency = "24"
+            groupId = "${spring.kafka.consumer.group-id}"
     )
     public void listen(String message, ConsumerRecord<?, ?> record, Acknowledgment acknowledgment) {
         // 打印当前消息来自哪个 Topic,方便区分
-        log.info("=== [CONSUMER] 收到消息 | Topic: {} | Key: {} | Msg: {} | Offset: {} ===",
-                record.topic(), record.key(), message, record.offset());
+
+
+        log.info("=== [CONSUMER] 收到消息 | Topic: {} | Key: {} | Msg: {} | Offset: {} | Thread: {} ===",
+                record.topic(), record.key(), message, record.offset(), Thread.currentThread().getName());
 
         ExecuteResponse executeResponse = processBusinessLogic(message, record);
 
         if ( executeResponse.getCode()==200) {
-            log.info("=== [CONSUMER] 任务处理成功 ===");
+            log.info("=== [CONSUMER] 任务处理成功 :{} {}===",message,Thread.currentThread().getName());
             acknowledgment.acknowledge();
 
         }else{
@@ -73,7 +90,7 @@ public class MessageListener {
             Task task = new Task();
             task.setTaskId(taskId);
             task.setFilePath(message);
-            
+
             // 创建任务记录
             String logPath = "logs/tasks/" + taskId + ".log";
             TaskRecordRequest taskRecordRequest = new TaskRecordRequest();
@@ -86,11 +103,11 @@ public class MessageListener {
             if (!recordCreated) {
                 throw new RuntimeException("创建任务记录失败");
             }
-            
+
             // 调用ParseServiceClient执行任务
             log.info("开始调用execute接口执行任务,消息内容:{}", message);
             ExecuteResponse response = parseServiceClient.executeTask(task);
-            log.info("任务执行完成,响应:{}", response.getMessage());
+            log.info("任务执行完成,响应:{}", response.getMsg());
             return response;
         } catch (Exception e) {
             log.error("处理消息任务失败", e);

+ 9 - 3
schedule-consumer/src/main/java/cn/com/yusys/consumer/util/ParseServiceClient.java

@@ -24,6 +24,9 @@ public class ParseServiceClient {
     @Value("${url.parse:http://127.0.0.1:8083/api/task/parse}")
     private String parseUrl;
 
+    @Value("${task.timeout-seconds:301}")
+    private Integer timeoutSeconds;
+
     // 通过构造函数注入WebClient.Builder,利用Spring自动配置
     public ParseServiceClient(WebClient.Builder webClientBuilder) {
         this.webClient = webClientBuilder
@@ -50,7 +53,7 @@ public class ParseServiceClient {
                                     clientResponse.statusCode().getReasonPhrase(),
                                     null, null, null)))
                     .bodyToMono(ExecuteResponse.class)
-                    .timeout(java.time.Duration.ofSeconds(30))
+                    .timeout(java.time.Duration.ofSeconds(timeoutSeconds))
                     .block();
 
             if (response != null && 200 == response.getCode()) {
@@ -58,7 +61,10 @@ public class ParseServiceClient {
                 return response;
             } else {
                 log.warn("任务执行返回失败,响应:{}", response);
-                return createErrorResponse("接口返回非200响应");
+                if (response != null) {
+                    return createErrorResponse(response.getMsg() != null ? response.getMsg() : "任务执行失败");
+                }
+                return createErrorResponse("任务执行失败,响应为空");
             }
         } catch (WebClientResponseException e) {
             log.error("任务执行失败,HTTP状态码:{}", e.getRawStatusCode(), e);
@@ -79,7 +85,7 @@ public class ParseServiceClient {
     private ExecuteResponse createErrorResponse(String errorMessage) {
         ExecuteResponse response = new ExecuteResponse();
         response.setCode(500);
-        response.setMessage(errorMessage);
+        response.setMsg(errorMessage);
         return response;
     }
 

+ 1 - 1
schedule-consumer/src/main/java/cn/com/yusys/consumer/util/response/ExecuteResponse.java

@@ -15,7 +15,7 @@ public class ExecuteResponse {
     /**
      * 响应消息
      */
-    private String message;
+    private String msg;
 
     /**
      * 响应数据

+ 11 - 2
schedule-consumer/src/main/resources/application.yml

@@ -7,11 +7,15 @@ spring:
   kafka:
     bootstrap-servers: 10.192.72.13:9092
     consumer:
-      group-id: schedule-topic-group
+      group-id: schedule-group-3
+      max-poll-records: 30  # 一次拉30条
+      max-poll-interval-ms: 600000  # 处理超时 10分钟
       enable-auto-commit: false
       auto-offset-reset: earliest
       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
       value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+    listener:
+      concurrency: 6
 
 # 可选:在配置中定义 Topic 列表,方便管理
 kafka:
@@ -19,5 +23,10 @@ kafka:
     listen: task_voucher_medium,task_contract_high,task_credit_risk_low,task_credit_risk_medium,task_post_loan_low,task_customer_high,task_other_high,schedule-topic,task_institution_high,task_other_medium,task_task_high,task_contract_low,task_institution_medium,task_voucher_high,task_voucher_low,task_other_low,task_institution_low,task_credit_risk_high,task_customer_low,task_customer_medium,task_post_loan_high,task_contract_medium,task_post_loan_medium
 
 url:
-  parse: "http://127.0.0.1:1083//api/task/parse"
+  parse: "http://127.0.0.1:1086/execute"
   record: "http://127.0.0.1:1083/api/task/create-record"
+
+task:
+  timeout-seconds: 301 # 解析任务执行超时(秒)
+
+

+ 1 - 0
schedule-manager/parser/__init__.py

@@ -0,0 +1 @@
+from .parse_service import main

+ 34 - 13
schedule-manager/parser/parse_service.py

@@ -38,21 +38,42 @@ async def execute_task(request: ExecuteRequest):
     """
     接收Java端下发的解析任务,后台异步执行
     """
+    try:
+        # 验证请求参数
+        if not request.file_path or not request.task_id:
+            return {
+                "code": 400,
+                "msg": "参数错误:filePath和taskId不能为空",
+                "data": None
+            }
 
-    # 调用远程解析服务
-    remote_url = "http://10.192.72.13:1086/execute"
+        # 调用远程解析服务
+        remote_url = "http://localhost:1086/execute"
 
-    async with httpx.AsyncClient() as client:
-        response = await client.post(
-            remote_url,
-            json={
-                "filePath": request.file_path,
-                "taskId": request.task_id
-            },
-            timeout=300.0
-        )
-        response.raise_for_status()
-        return response.json()
+        async with httpx.AsyncClient(timeout=300) as client:
+            try:
+                response = await client.post(
+                    remote_url,
+                    json={
+                        "filePath": request.file_path,
+                        "taskId": request.task_id
+                    }
+                )
+                response.raise_for_status()
+                return response.json()
+            except Exception as e:
+                return {
+                    "code": 500,
+                    "msg": f"调用远程解析服务失败: {str(e)}",
+                    "data": None
+                }
+
+    except Exception as e:
+        return {
+            "code": 500,
+            "msg": f"服务内部错误: {str(e)}",
+            "data": None
+        }
 
 @app.get("/status", summary="状态接口")
 async def health_check():

+ 7 - 0
schedule-manager/parser/pyproject.toml

@@ -0,0 +1,7 @@
+[build-system]
+requires = ["setuptools>=61.0"]
+build-backend = "setuptools.build_meta"
+
+[project]
+name = "parser_service"
+version = "0.1.0"

+ 1 - 1
schedule-manager/parser/readme.md

@@ -7,7 +7,7 @@ docker buildx inspect --bootstrap
 # 构建并加载到本地,适配amd64和arm64
 docker buildx build \
 --platform linux/amd64 \
--t parse:amd64 \
+-t parse-instance:amd64 \
 --load .
 
 

+ 7 - 0
schedule-manager/parser/setup.py

@@ -0,0 +1,7 @@
+from setuptools import setup, find_packages
+
+setup(
+    name="parser_service",
+    version="0.1.0",
+    packages=find_packages(),
+)

+ 3 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/config/ParserConfig.java

@@ -40,6 +40,9 @@ public class ParserConfig {
     @Value("${parser.monitor.gpu-load-threshold}")
     public double GPU_LOAD_THRESHOLD;
 
+    @Value("${parser.monitor.execute-timeout}")
+    public int EXECUTE_TIMEOUT;
+
     // GPU负载扩容实例数
     @Value("${parser.monitor.gpu-scale-instance-num}")
     public int GPU_SCALE_INSTANCE_NUM;

+ 3 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/entity/TaskRecordEntity.java

@@ -1,5 +1,7 @@
 package cn.com.yusys.manager.entity;
 
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableId;
 import com.baomidou.mybatisplus.annotation.TableName;
 import lombok.Data;
 import java.time.LocalDateTime;
@@ -15,6 +17,7 @@ public class TaskRecordEntity {
     /**
      * 主键ID
      */
+    @TableId(type = IdType.AUTO)
     private Integer id;
 
     /**

+ 1 - 1
schedule-manager/src/main/java/cn/com/yusys/manager/instanceManager/Impl/ProcessInstanceManager.java

@@ -145,7 +145,7 @@ public class ProcessInstanceManager implements InstanceManager {
         }
 
         ProcessBuilder processBuilder = new ProcessBuilder(
-                pythonPath,
+                "python",
                 scriptFullPath.toString(),
                 "--host", "0.0.0.0",
                 "--port", String.valueOf(port)

+ 1 - 1
schedule-manager/src/main/java/cn/com/yusys/manager/model/ExecuteResponse.java

@@ -13,7 +13,7 @@ import lombok.RequiredArgsConstructor;
 @RequiredArgsConstructor
 public  class ExecuteResponse {  // 执行结果响应类
     private Integer code;  // 响应状态码,通常用于表示请求处理结果的状态
-    private String message;  // 响应消息,通常用于描述请求处理结果的信息
+    private String msg;  // 响应消息,通常用于描述请求处理结果的信息
     private Object data;  // 响应数据,用于承载请求处理后的返回数据
 
 

+ 28 - 12
schedule-manager/src/main/java/cn/com/yusys/manager/service/InstanceMonitorService.java

@@ -8,6 +8,7 @@ import cn.com.yusys.manager.util.ParseInstanceClient;
 import cn.com.yusys.manager.common.PortPool;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Service;
 
@@ -17,6 +18,7 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 
 /**
@@ -56,6 +58,12 @@ public class InstanceMonitorService {
     @Resource
     private TaskRecordService taskRecordService;
 
+    @Value("${parser.filePath.source}")
+    private String sourcePath;
+
+    @Value("${parser.filePath.target}")
+    private String targetPath;
+
     @PostConstruct
     public void initParseInstance(){
         log.info("开始初始化解析实例...");
@@ -329,7 +337,8 @@ public class InstanceMonitorService {
    public ExecuteResponse processMultimodalTask(Task task) {
         Map<String, InstanceStatus> activeInstancePool = instancestatusRegistry.getActiveInstancePool();
         String taskId = task.getTaskId();
-        
+
+
         try {
             TaskRecordRequest taskRecordRequest=new TaskRecordRequest();
             taskRecordRequest.setTaskId(taskId);
@@ -403,9 +412,9 @@ public class InstanceMonitorService {
             
             // 记录任务完成日志
             if (response.getCode() == 200) {
-                taskLogService.logTaskComplete(taskId, response.getMessage());
+                taskLogService.logTaskComplete(taskId, response.getMsg()+"执行结果"+response.getData());
             } else {
-                taskLogService.logTaskFailure(taskId,response.getMessage());
+                taskLogService.logTaskFailure(taskId,response.getMsg());
             }
             
             return response;
@@ -426,12 +435,12 @@ public class InstanceMonitorService {
      * 查找空闲的解析实例
      */
     private InstanceStatus findIdleInstance(Map<String, InstanceStatus> activeInstancePool) {
-        return activeInstancePool.values().stream()
-                .filter(status -> status.getStatus() == 0) // 状态为0表示空闲
-                .findFirst()
-                .orElse(null);
-    }
+        List<InstanceStatus> idleList = activeInstancePool.values().stream()
+                .filter(status -> status.getStatus() == 0)
+                .collect(Collectors.toList());
 
+        return idleList.isEmpty() ? null : idleList.get(ThreadLocalRandom.current().nextInt(idleList.size()));
+    }
 
     /**
      * 执行任务
@@ -446,9 +455,9 @@ public class InstanceMonitorService {
 
             // 调用解析器执行任务
             ExecuteResponse response = callParser(instance, task);
-
+            log.info("任务执行结束,实例:{},响应:{}", instanceId, response);
             if (response != null && response.getCode() == 200) {
-                log.info("任务执行成功,实例:{},响应:{}", instanceId, response.getMessage());
+                log.info("任务执行成功,实例:{},响应:{}", instanceId, response.getMsg());
                 return response;
             } else {
                 log.warn("任务执行失败,实例:{},响应:{}", instanceId, response);
@@ -474,8 +483,15 @@ public class InstanceMonitorService {
                     instance.getPort(), 
                     task);
 
-            if (response == null || response.getCode() != 200) {
-                log.warn("调用解析器返回失败,实例:{},响应:{}", instance.getInstanceId(), response);
+            if (response == null || response.getCode() != 200 ) {
+                log.error("调用解析器返回失败,实例:{},响应:{}", instance.getInstanceId(), response);
+                return response;
+            }
+            String data=(String) response.getData();
+            if(data.startsWith("解析失败")){
+                log.error("解析器执行任务失败,实例:{},响应:{}", instance.getInstanceId(), response);
+                response.setCode(500);
+                response.setMsg("解析器执行任务失败");
             }
             return response;
         } catch (Exception e) {

+ 4 - 4
schedule-manager/src/main/java/cn/com/yusys/manager/util/ParseInstanceClient.java

@@ -51,7 +51,7 @@ public class ParseInstanceClient {
             // 获取超时时间,增加默认值兜底
             long timeout = parserConfig != null && parserConfig.STATUS_QUERY_TIMEOUT > 0
                     ? parserConfig.STATUS_QUERY_TIMEOUT
-                    : 3000; // 默认3秒超时
+                    : 3; // 默认3秒超时
 
             InstanceStatusResponse response = webClient.get()
                     .uri(statusUrl)
@@ -66,7 +66,7 @@ public class ParseInstanceClient {
                                     null, null, null)))
                     .bodyToMono(InstanceStatusResponse.class)
                     // 设置超时时间(增加默认值)
-                    .timeout(java.time.Duration.ofMillis(timeout))
+                    .timeout(java.time.Duration.ofSeconds(timeout))
                     .block();
 
             // 2. 响应校验:返回有效响应或错误响应
@@ -133,7 +133,7 @@ public class ParseInstanceClient {
                                     clientResponse.statusCode().getReasonPhrase(),
                                     null, null, null)))
                     .bodyToMono(ExecuteResponse.class)
-                    .timeout(java.time.Duration.ofSeconds(30))
+                    .timeout(java.time.Duration.ofSeconds(parserConfig.EXECUTE_TIMEOUT))
                     .block();
 
             // 2. 响应校验
@@ -142,7 +142,7 @@ public class ParseInstanceClient {
                 return response;
             } else {
                 log.warn("实例{}:{}任务执行返回失败,响应:{}", instanceIp, instancePort, response);
-                return ExecuteResponse.fail("接口返回非200响应");
+                return ExecuteResponse.fail(response.getMsg() != null ? response.getMsg() : "接口返回非200响应");
             }
         } catch (WebClientResponseException e) {
             log.error("实例{}:{}任务执行失败,HTTP状态码:{}", instanceIp, instancePort, e.getRawStatusCode(), e);

+ 15 - 8
schedule-manager/src/main/resources/application.yml

@@ -24,31 +24,38 @@ parser:
     work-dir: .
     image: parse-instance:amd64
     #实例启动方式
-    type: docker
+    type: process
 
     #实例启动端口池
     port:
-      start: 1030
-      end: 1050
+      start: 1010
+      end: 1060
+  filePath:
+    source: /data/app/multimodal_project/classifyContent/distributed_output
+    target: ./examples
+
+
 
 
-  
   # 监控参数配置
   monitor:
     # 心跳超时阈值(毫秒)30000ms (30秒)
     heartbeat-timeout: 30000
 
     # 最小活跃实例数,最小实例池保持数量
-    min-active-instance: 3
+    min-active-instance: 10
 
     # 最大活跃实例数,资源上限
-    max-active-instance: 6
+    max-active-instance: 11
 
     # 任务积压阈值,触发临时扩容的 Kafka Lag 阈值 (> 100)
     task-backlog-threshold: 100
 
-    # 状态接口调用超时(毫秒),
-    status-query-timeout: 5000
+    # 状态接口调用超时(秒),
+    status-query-timeout: 5
+
+    # 解析任务执行超时(秒)
+    execute-timeout: 61
 
     # 状态接口连续失败次数阈值,用于判断实例是否失联
     status-query-fail-count: 3