浏览代码

修改扩缩容机制和消息重试

zsh 2 周之前
父节点
当前提交
e147072ab9
共有 32 个文件被更改,包括 328 次插入1862 次删除
  1. 19 12
      schedule-consumer/src/main/java/cn/com/yusys/consumer/config/KafkaConsumerConfig.java
  2. 8 6
      schedule-consumer/src/main/java/cn/com/yusys/consumer/listener/MessageListener.java
  3. 4 4
      schedule-consumer/src/main/resources/application.yml
  4. 0 342
      schedule-manager/parser/core/router.py
  5. 0 10
      schedule-manager/parser/dockerfile
  6. 0 0
      schedule-manager/parser/examples/test1.pdf
  7. 0 25
      schedule-manager/parser/models/result.py
  8. 0 137
      schedule-manager/parser/parsers/audio_parser.py
  9. 0 282
      schedule-manager/parser/parsers/native_parser.py
  10. 0 43
      schedule-manager/parser/parsers/text_parser.py
  11. 0 151
      schedule-manager/parser/parsers/video_parser.py
  12. 0 336
      schedule-manager/parser/parsers/visual_parser.py
  13. 15 0
      schedule-manager/parser/readme.md
  14. 0 165
      schedule-manager/parser/utils/ffmpeg_wrapper.py
  15. 0 47
      schedule-manager/parser/utils/logger.py
  16. 0 51
      schedule-manager/parser/utils/mime_detector.py
  17. 0 80
      schedule-manager/parser/utils/stability.py
  18. 3 0
      schedule-manager/src/main/java/cn/com/yusys/manager/common/PortPool.java
  19. 0 4
      schedule-manager/src/main/java/cn/com/yusys/manager/config/ParserConfig.java
  20. 56 4
      schedule-manager/src/main/java/cn/com/yusys/manager/controller/TaskController.java
  21. 8 6
      schedule-manager/src/main/java/cn/com/yusys/manager/instanceManager/Impl/ProcessInstanceManager.java
  22. 4 0
      schedule-manager/src/main/java/cn/com/yusys/manager/model/InstanceManagementResponse.java
  23. 3 0
      schedule-manager/src/main/java/cn/com/yusys/manager/model/InstanceStatus.java
  24. 14 0
      schedule-manager/src/main/java/cn/com/yusys/manager/model/TaskLogRequest.java
  25. 123 147
      schedule-manager/src/main/java/cn/com/yusys/manager/service/InstanceMonitorService.java
  26. 2 0
      schedule-manager/src/main/java/cn/com/yusys/manager/service/TaskRecordService.java
  27. 7 0
      schedule-manager/src/main/java/cn/com/yusys/manager/service/impl/TaskRecordServiceImpl.java
  28. 8 8
      schedule-manager/src/main/resources/application.yml
  29. 22 0
      schedule-producer/pom.xml
  30. 1 1
      schedule-producer/src/main/resources/application.yml
  31. 30 0
      schedule-producer/src/test/java/cn/com/yusys/producer/service/MessageSenderTest.java
  32. 1 1
      schedule-producer/src/test/java/cn/com/yusys/producer/service/util/FileTypeDetectorTest.java

+ 19 - 12
schedule-consumer/src/main/java/cn/com/yusys/consumer/config/KafkaConsumerConfig.java

@@ -28,7 +28,7 @@ public class KafkaConsumerConfig {
     @Value("${spring.kafka.consumer.group-id}")
     private String groupId;
 
-    // 1. 死信生产者
+    // ===================== 死信队列生产者 =====================
     @Bean
     public ProducerFactory<String, String> dltProducerFactory() {
         Map<String, Object> props = new HashMap<>();
@@ -44,26 +44,34 @@ public class KafkaConsumerConfig {
         return new KafkaTemplate<>(dltProducerFactory());
     }
 
-    // 2. 死信恢复器
+    // ===================== 死信恢复器(自动发死信主题) =====================
     @Bean
     public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer() {
+        // 默认规则:原主题名 + .DLT
+        // 例如 test_topic → test_topic.DLT
         return new DeadLetterPublishingRecoverer(dltKafkaTemplate());
     }
 
-    // 3. 错误处理器 (重试 3 次 -> 死信)
+    // ===================== 错误处理器(核心) =====================
     @Bean
     public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer recoverer) {
+        // 重试间隔 1秒
         long interval = 1000L;
-        long maxAttempts = 3L;
 
-        // 构造函数传入恢复器和退避策略
-        DefaultErrorHandler handler = new DefaultErrorHandler(recoverer, new FixedBackOff(interval, maxAttempts));
+        // 1次正常消费 + 2次重试 = 3次后进入死信
+        long maxRetryCount = 2L;
 
+        DefaultErrorHandler handler = new DefaultErrorHandler(recoverer,
+                new FixedBackOff(interval, maxRetryCount));
 
+        // 非关键异常可以设置不重试(可选)
+        // handler.addNotRetryableExceptions(IllegalArgumentException.class);
+
+        log.info("Kafka 死信队列错误处理器初始化完成,最大重试次数:{}", maxRetryCount);
         return handler;
     }
 
-    // 4. 消费者工厂
+    // ===================== 消费者工厂 =====================
     @Bean
     public ConsumerFactory<String, String> consumerFactory() {
         Map<String, Object> props = new HashMap<>();
@@ -72,12 +80,12 @@ public class KafkaConsumerConfig {
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 必须 false
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
 
         return new DefaultKafkaConsumerFactory<>(props);
     }
 
-    // 5. 监听器工厂 (整合 Ack 模式和 ErrorHandler)
+    // ===================== 监听器工厂 =====================
     @Bean
     public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
             DefaultErrorHandler errorHandler) {
@@ -85,12 +93,11 @@ public class KafkaConsumerConfig {
         ConcurrentKafkaListenerContainerFactory<String, String> factory =
                 new ConcurrentKafkaListenerContainerFactory<>();
         factory.setConsumerFactory(consumerFactory());
-        factory.setConcurrency(1);
+        factory.setConcurrency(24);
 
-        // 【核心】注入错误处理器
+        // 注入错误处理器
         factory.setCommonErrorHandler(errorHandler);
 
-        // 【核心】设置手动提交模式
         factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
 
         return factory;

+ 8 - 6
schedule-consumer/src/main/java/cn/com/yusys/consumer/listener/MessageListener.java

@@ -34,7 +34,9 @@ public class MessageListener {
      * 监听多个 Topic
      * 注意:同一个 groupId 下的不同消费者实例会共同负载均衡消费这些 Topic 的所有分区。
      */
-    @KafkaListener(topics = "${kafka.topics.listen}", groupId = "${spring.kafka.consumer.group-id}")
+    @KafkaListener(   topics = "#{'${kafka.topics.listen}'.split(',')}",
+            groupId = "${spring.kafka.consumer.group-id}",    concurrency = "24"
+    )
     public void listen(String message, ConsumerRecord<?, ?> record, Acknowledgment acknowledgment) {
         // 打印当前消息来自哪个 Topic,方便区分
         log.info("=== [CONSUMER] 收到消息 | Topic: {} | Key: {} | Msg: {} | Offset: {} ===",
@@ -42,7 +44,7 @@ public class MessageListener {
 
         ExecuteResponse executeResponse = processBusinessLogic(message, record);
 
-        if (executeResponse != null && executeResponse.getCode()==200) {
+        if ( executeResponse.getCode()==200) {
             log.info("=== [CONSUMER] 任务处理成功 ===");
             acknowledgment.acknowledge();
 
@@ -82,17 +84,17 @@ public class MessageListener {
             taskRecordRequest.setLogPath(logPath);
             boolean recordCreated = taskRecordClient.createTaskRecord(taskRecordRequest);
             if (!recordCreated) {
-                log.warn("创建任务记录失败,任务ID:{}", taskId);
+                throw new RuntimeException("创建任务记录失败");
             }
             
             // 调用ParseServiceClient执行任务
             log.info("开始调用execute接口执行任务,消息内容:{}", message);
             ExecuteResponse response = parseServiceClient.executeTask(task);
-            log.info("任务执行完成,响应:{}", response);
+            log.info("任务执行完成,响应:{}", response.getMessage());
             return response;
         } catch (Exception e) {
-            log.error("调用execute接口执行任务失败", e);
-            throw new RuntimeException("任务执行失败", e);
+            log.error("处理消息任务失败", e);
+            throw new RuntimeException("处理消息任务失败", e);
         }
     }
 }

+ 4 - 4
schedule-consumer/src/main/resources/application.yml

@@ -1,5 +1,5 @@
 server:
-  port: 8082
+  port: 1082
 
 spring:
   application:
@@ -16,8 +16,8 @@ spring:
 # 可选:在配置中定义 Topic 列表,方便管理
 kafka:
   topics:
-    listen: task_post_loan_medium
+    listen: task_voucher_medium,task_contract_high,task_credit_risk_low,task_credit_risk_medium,task_post_loan_low,task_customer_high,task_other_high,schedule-topic,task_institution_high,task_other_medium,task_task_high,task_contract_low,task_institution_medium,task_voucher_high,task_voucher_low,task_other_low,task_institution_low,task_credit_risk_high,task_customer_low,task_customer_medium,task_post_loan_high,task_contract_medium,task_post_loan_medium
 
 url:
-  parse: "http://127.0.0.1:8083//api/task/parse"
-  record: "http://127.0.0.1:8083/api/task/create-record"
+  parse: "http://127.0.0.1:1083//api/task/parse"
+  record: "http://127.0.0.1:1083/api/task/create-record"

+ 0 - 342
schedule-manager/parser/core/router.py

@@ -1,342 +0,0 @@
-from typing import Optional, Type
-from abc import ABC, abstractmethod
-import fitz  # PyMuPDF
-from utils.mime_detector import MimeDetector
-from utils.logger import log
-from models.result import ParseResult
-
-
-class Parser(ABC):
-    """解析器基类"""
-    
-    @abstractmethod
-    async def parse(self, file_path: str) -> ParseResult:
-        """
-        解析文件
-        
-        Args:
-            file_path: 文件路径
-            
-        Returns:
-            ParseResult: 解析结果
-        """
-        pass
-    
-    def _get_file_size(self, file_path: str) -> int:
-        """
-        获取文件大小
-        
-        Args:
-            file_path: 文件路径
-            
-        Returns:
-            int: 文件大小(字节)
-        """
-        import os
-        try:
-            return os.path.getsize(file_path)
-        except Exception:
-            return 0
-
-
-class ParserFactory:
-    """解析器工厂类"""
-    
-    def __init__(self):
-        self.mime_detector = MimeDetector()
-        self.parsers = {}
-        self.parser_instances = {}  # 缓存解析器实例
-        # 统计信息
-        self.stats = {
-            'total_files': 0,
-            'total_size': 0,
-            'text_files': 0,
-            'text_size': 0,
-            'image_files': 0,
-            'image_size': 0,
-            'audio_files': 0,
-            'audio_size': 0,
-            'video_files': 0,
-            'video_size': 0,
-            'pdf_files': 0,
-            'pdf_size': 0,
-            'office_files': 0,
-            'office_size': 0,
-            'total_time': 0,
-            'successful_files': 0,
-            'failed_files': 0
-        }
-    
-    def register_parser(self, mime_type: str, parser_class: Type[Parser]):
-        """
-        注册解析器
-        
-        Args:
-            mime_type: MIME类型
-            parser_class: 解析器类
-        """
-        self.parsers[mime_type] = parser_class
-    
-    async def get_parser(self, file_path: str) -> Parser:
-        """
-        根据文件类型和内容特征获取合适的解析器
-        
-        Args:
-            file_path: 文件路径
-            
-        Returns:
-            Parser: 解析器实例
-        """
-        log.info(f"开始获取解析器,文件路径: {file_path}")
-        # 1. 检测文件MIME类型
-        mime_type = self.mime_detector.detect(file_path)
-        log.info(f"文件MIME类型: {mime_type}")
-        
-        # 2. 第一层路由:根据MIME类型分流
-        if mime_type.startswith("text/"):
-            log.info("检测到文本文件,使用TextParser")
-            if "TextParser" not in self.parser_instances:
-                from parsers.text_parser import TextParser
-                self.parser_instances["TextParser"] = TextParser()
-            return self.parser_instances["TextParser"]
-        elif mime_type.startswith("image/"):
-            log.info("检测到图片文件,使用VisualDocParser")
-            if "VisualDocParser" not in self.parser_instances:
-                from parsers.visual_parser import VisualDocParser
-                self.parser_instances["VisualDocParser"] = VisualDocParser()
-            return self.parser_instances["VisualDocParser"]
-        elif mime_type.startswith("audio/"):
-            log.info("检测到音频文件,使用AudioParser")
-            if "AudioParser" not in self.parser_instances:
-                from parsers.audio_parser import AudioParser
-                self.parser_instances["AudioParser"] = AudioParser()
-            return self.parser_instances["AudioParser"]
-        elif mime_type.startswith("video/"):
-            log.info("检测到视频文件,使用VideoParser")
-            if "VideoParser" not in self.parser_instances:
-                from parsers.video_parser import VideoParser
-                self.parser_instances["VideoParser"] = VideoParser()
-            return self.parser_instances["VideoParser"]
-        elif mime_type == "application/pdf":
-            # 3. 第二层路由:PDF特殊处理
-            log.info("检测到PDF文件,进入特殊路由")
-            return await self._route_pdf(file_path)
-        elif "openxmlformats" in mime_type or mime_type == "application/msword":
-            # Office文件处理(包括docx和doc)
-            log.info(f"检测到Office文件,MIME类型: {mime_type},使用NativeDocParser")
-            return await self._route_office(file_path, mime_type)
-        else:
-            log.error(f"不支持的文件类型: {mime_type}")
-            raise Exception(f"不支持的文件类型: {mime_type}")
-    
-    async def _route_pdf(self, file_path: str) -> Parser:
-        """
-        PDF文件路由逻辑
-        
-        Args:
-            file_path: 文件路径
-            
-        Returns:
-            Parser: 解析器实例
-        """
-        # 检测PDF是否为扫描件(文本密度检测)
-        if self._is_scanned_pdf(file_path):
-            log.info("PDF为扫描件,使用VisualDocParser")
-            if "VisualDocParser" not in self.parser_instances:
-                from parsers.visual_parser import VisualDocParser
-                self.parser_instances["VisualDocParser"] = VisualDocParser()
-            return self.parser_instances["VisualDocParser"]
-        else:
-            log.info("PDF为原生文档,使用NativeDocParser")
-            if "NativeDocParser" not in self.parser_instances:
-                from parsers.native_parser import NativeDocParser
-                self.parser_instances["NativeDocParser"] = NativeDocParser()
-            return self.parser_instances["NativeDocParser"]
-    
-    async def _route_office(self, file_path: str, mime_type: str) -> Parser:
-        """
-        Office文件路由逻辑
-        
-        Args:
-            file_path: 文件路径
-            mime_type: MIME类型
-            
-        Returns:
-            Parser: 解析器实例
-        """
-        if "NativeDocParser" not in self.parser_instances:
-            from parsers.native_parser import NativeDocParser
-            self.parser_instances["NativeDocParser"] = NativeDocParser()
-        return self.parser_instances["NativeDocParser"]
-    
-    def _is_scanned_pdf(self, file_path: str) -> bool:
-        """
-        检测PDF是否为扫描件
-        
-        Args:
-            file_path: 文件路径
-            
-        Returns:
-            bool: 是否为扫描件
-        """
-        try:
-            doc = fitz.open(file_path)
-            text_content = ""
-            # 提取前3页文本
-            for page_num in range(min(3, len(doc))):
-                page = doc[page_num]
-                text_content += page.get_text()
-            doc.close()
-            
-            # 计算有效字符数
-            valid_chars = len([c for c in text_content if c.isalnum() or c.isspace()])
-            log.info(f"PDF前3页有效字符数: {valid_chars}")
-            
-            # 如果有效字符数少于50,认为是扫描件
-            return valid_chars < 50
-        except Exception as e:
-            log.error(f"PDF文本提取失败: {str(e)}")
-            # 提取失败时默认使用VisualDocParser
-            return True
-    
-    async def parse(self, file_path: str) -> ParseResult:
-        """
-        解析文件的入口方法
-        
-        Args:
-            file_path: 文件路径
-            
-        Returns:
-            ParseResult: 解析结果
-        """
-        import time
-        import os
-        
-        start_time = time.time()
-        file_size = 0
-        
-        try:
-            file_size = os.path.getsize(file_path)
-        except Exception:
-            pass
-        
-        log.info(f"开始解析文件: {file_path}, 文件大小: {file_size / (1024 * 1024):.2f} MB")
-        
-        try:
-            parser = await self.get_parser(file_path)
-            log.info(f"获取到解析器: {parser.__class__.__name__}")
-            
-            result = await parser.parse(file_path)
-            
-            end_time = time.time()
-            elapsed_time = end_time - start_time
-            
-            # 更新统计信息
-            self.stats['total_files'] += 1
-            self.stats['total_size'] += file_size
-            self.stats['total_time'] += elapsed_time
-            self.stats['successful_files'] += 1
-            
-            # 根据文件类型更新统计
-            file_type = result.file_type
-            if file_type.startswith('text'):
-                self.stats['text_files'] += 1
-                self.stats['text_size'] += file_size
-            elif file_type.startswith('image') or file_type == 'visual':
-                self.stats['image_files'] += 1
-                self.stats['image_size'] += file_size
-            elif file_type.startswith('audio'):
-                self.stats['audio_files'] += 1
-                self.stats['audio_size'] += file_size
-            elif file_type.startswith('video'):
-                self.stats['video_files'] += 1
-                self.stats['video_size'] += file_size
-            elif file_type == 'pdf' or file_type == 'pdf_scanned':
-                self.stats['pdf_files'] += 1
-                self.stats['pdf_size'] += file_size
-            elif file_type == 'office':
-                self.stats['office_files'] += 1
-                self.stats['office_size'] += file_size
-            
-            # 解析结果日志
-            log.info(f"文件解析完成,耗时: {elapsed_time:.2f} 秒")
-            log.info(f"文件类型: {result.file_type}")
-            log.info(f"解析内容长度: {len(result.content)} 字符")
-            log.info(f"元数据: {result.metadata}")
-            if result.tables:
-                log.info(f"提取到表格数量: {len(result.tables)}")
-            
-            return result
-        except Exception as e:
-            end_time = time.time()
-            elapsed_time = end_time - start_time
-            
-            # 更新统计信息
-            self.stats['total_files'] += 1
-            self.stats['total_size'] += file_size
-            self.stats['total_time'] += elapsed_time
-            self.stats['failed_files'] += 1
-            
-            log.error(f"解析失败: {str(e)}, 耗时: {elapsed_time:.2f} 秒")
-            # 返回错误结果
-            return ParseResult(
-                content=f"解析失败: {str(e)}",
-                metadata={"error": str(e)},
-                file_type="error"
-            )
-    
-    def generate_performance_report(self) -> str:
-        """
-        生成性能报告
-        
-        Returns:
-            str: 性能报告
-        """
-        stats = self.stats
-        
-        # 计算各项指标
-        total_files = stats['total_files']
-        total_size = stats['total_size']
-        total_time = stats['total_time']
-        successful_files = stats['successful_files']
-        failed_files = stats['failed_files']
-        
-        # 计算各类文件占比
-        text_ratio = (stats['text_size'] / total_size * 100) if total_size > 0 else 0
-        image_ratio = (stats['image_size'] / total_size * 100) if total_size > 0 else 0
-        audio_ratio = (stats['audio_size'] / total_size * 100) if total_size > 0 else 0
-        video_ratio = (stats['video_size'] / total_size * 100) if total_size > 0 else 0
-        pdf_ratio = (stats['pdf_size'] / total_size * 100) if total_size > 0 else 0
-        office_ratio = (stats['office_size'] / total_size * 100) if total_size > 0 else 0
-        
-        # 计算解析速度
-        total_size_mb = total_size / (1024 * 1024)
-        avg_speed = (total_size_mb / total_time) if total_time > 0 else 0
-        
-        # 生成报告
-        report = f"""# 解析性能报告
-
-## 总体情况
-- 总解析文件数: {total_files}
-- 成功解析: {successful_files}
-- 解析失败: {failed_files}
-- 总文件大小: {total_size_mb:.2f} MB
-- 总耗时: {total_time:.2f} 秒
-- 平均解析速度: {avg_speed:.2f} MB/秒
-
-## 文件类型分布
-- 文本文件: {stats['text_files']} 个, {stats['text_size'] / (1024 * 1024):.2f} MB, 占比: {text_ratio:.2f}%
-- 图片文件: {stats['image_files']} 个, {stats['image_size'] / (1024 * 1024):.2f} MB, 占比: {image_ratio:.2f}%
-- 音频文件: {stats['audio_files']} 个, {stats['audio_size'] / (1024 * 1024):.2f} MB, 占比: {audio_ratio:.2f}%
-- 视频文件: {stats['video_files']} 个, {stats['video_size'] / (1024 * 1024):.2f} MB, 占比: {video_ratio:.2f}%
-- PDF文件: {stats['pdf_files']} 个, {stats['pdf_size'] / (1024 * 1024):.2f} MB, 占比: {pdf_ratio:.2f}%
-- Office文件: {stats['office_files']} 个, {stats['office_size'] / (1024 * 1024):.2f} MB, 占比: {office_ratio:.2f}%
-
-## 性能分析
-- 文本类平均解析速度: {(stats['text_size'] / (1024 * 1024) / total_time):.2f} MB/秒 (如果有文本文件)
-- 图片类平均解析速度: {(stats['image_size'] / (1024 * 1024) / total_time):.2f} MB/秒 (如果有图片文件)
-- 音频类平均解析速度: {(stats['audio_size'] / (1024 * 1024) / total_time):.2f} MB/秒 (如果有音频文件)
-- 视频类平均解析速度: {(stats['video_size'] / (1024 * 1024) / total_time):.2f} MB/秒 (如果有视频文件)
-"""
-        
-        return report

+ 0 - 10
schedule-manager/parser/dockerfile

@@ -14,16 +14,6 @@ ENV PYTHONIOENCODING=utf-8 \
 # ========== 系统依赖安装 ==========
 RUN apt-get update && \
     apt-get install -y --no-install-recommends \
-    libglib2.0-0 \
-    libsm6 \
-    libxext6 \
-    libxrender-dev \
-    libgomp1 \
-    libgthread-2.0-0 \
-    libgtk-3-0 \
-    libgstreamer1.0-0 \
-    libgstreamer-plugins-base1.0-0 \
-    ffmpeg \
     ca-certificates \
     && rm -rf /var/lib/apt/lists/*
 

+ 0 - 0
schedule-manager/parser/examples/test1.pdf


+ 0 - 25
schedule-manager/parser/models/result.py

@@ -1,25 +0,0 @@
-from dataclasses import dataclass, field
-from typing import Dict, List, Optional
-
-
-@dataclass
-class ParseResult:
-    """解析结果的统一输出结构"""
-    content: str = ""  # 解析出的 Markdown 文本
-    metadata: Dict[str, any] = field(default_factory=dict)  # 页数、作者、时长等元数据
-    file_type: str = ""  # 识别出的具体类型
-    tables: List[Dict] = field(default_factory=list)  # 提取出的结构化表格数据
-
-    def to_dict(self) -> Dict[str, any]:
-        """转换为字典格式"""
-        return {
-            "content": self.content,
-            "metadata": self.metadata,
-            "file_type": self.file_type,
-            "tables": self.tables
-        }
-
-    def to_json(self) -> str:
-        """转换为JSON字符串"""
-        import json
-        return json.dumps(self.to_dict(), ensure_ascii=False, indent=2)

+ 0 - 137
schedule-manager/parser/parsers/audio_parser.py

@@ -1,137 +0,0 @@
-from core.router import Parser
-from models.result import ParseResult
-from utils.logger import log
-from utils.ffmpeg_wrapper import FFmpegWrapper
-import os
-import tempfile
-import requests
-
-
-class AudioParser(Parser):
-    """音频文件解析器"""
-    
-    def __init__(self):
-        self.ffmpeg = FFmpegWrapper()
-        # Qwen3-ASR模型配置 - 使用专门的音频转录端点
-        self.qwen_asr_api_url = "http://10.192.72.13:7283/v1/audio/transcriptions"
-        log.info("音频解析器初始化完成,使用本地部署的Qwen3-ASR模型")
-    
-    async def parse(self, file_path: str) -> ParseResult:
-        """
-        解析音频文件
-        
-        Args:
-            file_path: 文件路径
-            
-        Returns:
-            ParseResult: 解析结果
-        """
-        log.info(f"开始解析音频文件: {file_path}")
-        temp_wav_path = None
-        try:
-            # 1. 预处理:转换为16k/16bit/mono wav
-            with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as temp_file:
-                temp_wav_path = temp_file.name
-            
-            log.info(f"创建临时文件: {temp_wav_path}")
-            self.ffmpeg.convert_audio(file_path, temp_wav_path)
-            log.info(f"音频转换完成: {temp_wav_path}")
-            
-            # 2. 使用Qwen3-ASR进行语音识别
-            log.info("开始使用Qwen3-ASR进行语音识别...")
-            log.info(f"使用的API地址: {self.qwen_asr_api_url}")
-            
-            content = []
-            result = None
-            try:
-                # 检查文件大小
-                file_size = os.path.getsize(temp_wav_path)
-                log.info(f"音频文件大小: {file_size / (1024 * 1024):.2f} MB")
-                
-                # 使用专门的音频转录端点,支持文件上传
-                log.info("准备使用文件上传方式进行语音识别...")
-                
-                # 构建请求数据
-                session = requests.Session()
-                session.trust_env = False  # 禁用环境变量中的代理设置
-                
-                # 使用multipart/form-data上传文件
-                files = {
-                    'file': ('audio.wav', open(temp_wav_path, 'rb'), 'audio/wav')
-                }
-                data = {
-                    'model': '/data/shared/Qwen3-ASR/qwen/Qwen3-ASR-1.7B',
-                    'language': 'zh',
-                    'response_format': 'json'
-                }
-                
-                # 发送请求
-                log.info("开始发送请求...")
-                # 增加超时时间,音频处理可能需要更长时间
-                response = session.post(self.qwen_asr_api_url, files=files, data=data, timeout=600)
-                log.info(f"请求完成,状态码: {response.status_code}")
-                
-                # 打印响应内容以进行调试
-                if response.status_code != 200:
-                    log.warning(f"响应内容: {response.text}")
-                
-                response.raise_for_status()
-                result = response.json()
-                
-                log.info("Qwen3-ASR语音识别完成")
-                log.info(f"识别结果: {result}")
-                
-                # 3. 构建解析结果
-                if result and 'text' in result:
-                    full_text = result['text']
-                    # 清理识别结果中的标记
-                    clean_text = full_text.replace('language Chinese<asr_text>', '').strip()
-                    content.append(f"完整文本: {clean_text}")
-                else:
-                    log.warning("解析失败:未获取到有效结果")
-                    content.append("解析失败:未获取到有效结果")
-                
-                log.info(f"构建完成,内容长度: {len(content)}")
-            except requests.exceptions.Timeout as e:
-                log.error(f"Qwen3-ASR语音识别超时: {str(e)}")
-                content.append("语音识别超时,请检查服务是否正常运行")
-            except requests.exceptions.ConnectionError as e:
-                log.error(f"Qwen3-ASR语音识别连接错误: {str(e)}")
-                content.append("语音识别连接错误,请检查服务地址是否正确")
-            except Exception as e:
-                log.error(f"Qwen3-ASR语音识别失败: {str(e)}")
-                import traceback
-                log.error(f"异常堆栈: {traceback.format_exc()}")
-                # 即使失败,也尝试返回一个基本结果
-                content.append("语音识别失败,但文件已成功处理")
-            
-            # 清理临时文件
-            if temp_wav_path and os.path.exists(temp_wav_path):
-                os.remove(temp_wav_path)
-                log.info(f"临时文件已清理: {temp_wav_path}")
-            
-            return ParseResult(
-                content="\n".join(content),
-                metadata={
-                    "parser": "Qwen3-ASR",
-                    "file_size": os.path.getsize(file_path),
-                    "api_url": self.qwen_asr_api_url
-                },
-                file_type="audio"
-            )
-        except Exception as e:
-            log.error(f"音频文件解析失败: {str(e)}")
-            import traceback
-            log.error(f"异常堆栈: {traceback.format_exc()}")
-            # 清理临时文件
-            if temp_wav_path and os.path.exists(temp_wav_path):
-                try:
-                    os.remove(temp_wav_path)
-                    log.info(f"临时文件已清理: {temp_wav_path}")
-                except:
-                    pass
-            return ParseResult(
-                content="",
-                metadata={"error": str(e)},
-                file_type="audio"
-            )

+ 0 - 282
schedule-manager/parser/parsers/native_parser.py

@@ -1,282 +0,0 @@
-from core.router import Parser
-from models.result import ParseResult
-from utils.logger import log
-import fitz  # PyMuPDF
-from docx import Document
-import openpyxl
-from pptx import Presentation
-import os
-
-
-class NativeDocParser(Parser):
-    """原生文档解析器,处理Office文档和原生PDF"""
-    
-    async def parse(self, file_path: str) -> ParseResult:
-        """
-        解析原生文档
-        
-        Args:
-            file_path: 文件路径
-            
-        Returns:
-            ParseResult: 解析结果
-        """
-        log.info(f"开始解析原生文档: {file_path}")
-        try:
-            # 根据文件扩展名判断文件类型
-            ext = os.path.splitext(file_path)[1].lower()
-            
-            if ext == '.pdf':
-                return await self._parse_pdf(file_path)
-            elif ext == '.docx':
-                return await self._parse_docx(file_path)
-            elif ext == '.doc':
-                return await self._parse_doc(file_path)
-            elif ext == '.xlsx':
-                return await self._parse_xlsx(file_path)
-            elif ext == '.pptx':
-                return await self._parse_pptx(file_path)
-            else:
-                raise Exception(f"不支持的文件扩展名: {ext}")
-        except Exception as e:
-            log.error(f"原生文档解析失败: {str(e)}")
-            return ParseResult(
-                content="",
-                metadata={"error": str(e)},
-                file_type="unknown"
-            )
-    
-    async def _parse_pdf(self, file_path: str) -> ParseResult:
-        """
-        解析PDF文件
-        
-        Args:
-            file_path: 文件路径
-            
-        Returns:
-            ParseResult: 解析结果
-        """
-        doc = fitz.open(file_path)
-        content = []
-        tables = []
-        page_count = len(doc)
-        
-        # 遍历所有页面
-        for page_num in range(page_count):
-            page = doc[page_num]
-            # 提取文本
-            text = page.get_text()
-            content.append(f"# 第{page_num + 1}页\n{text}")
-            
-            # 提取表格(PyMuPDF的表格提取功能有限)
-            # 这里可以根据需要使用更高级的表格提取库
-        
-        doc.close()
-        
-        return ParseResult(
-            content="\n\n".join(content),
-            metadata={
-                "page_count": page_count,
-                "file_size": os.path.getsize(file_path)
-            },
-            file_type="pdf",
-            tables=tables
-        )
-    
-    async def _parse_docx(self, file_path: str) -> ParseResult:
-        """
-        解析Word文档
-        
-        Args:
-            file_path: 文件路径
-            
-        Returns:
-            ParseResult: 解析结果
-        """
-        doc = Document(file_path)
-        content = []
-        tables = []
-        
-        # 提取标题和正文
-        for para in doc.paragraphs:
-            if para.style.name.startswith('Heading'):
-                # 根据标题级别添加Markdown标题
-                level = int(para.style.name.split(' ')[1])
-                content.append(f"{'#' * level} {para.text}")
-            else:
-                content.append(para.text)
-        
-        # 提取表格
-        for table_idx, table in enumerate(doc.tables):
-            table_content = []
-            table_data = []
-            
-            # 提取表头
-            header_cells = table.rows[0].cells
-            header = [cell.text.strip() for cell in header_cells]
-            table_content.append('| ' + ' | '.join(header) + ' |')
-            table_content.append('| ' + ' | '.join(['---'] * len(header)) + ' |')
-            table_data.append(header)
-            
-            # 提取表格内容
-            for row in table.rows[1:]:
-                cells = row.cells
-                row_data = [cell.text.strip() for cell in cells]
-                table_content.append('| ' + ' | '.join(row_data) + ' |')
-                table_data.append(row_data)
-            
-            content.append('\n'.join(table_content))
-            tables.append({
-                "table_id": table_idx,
-                "data": table_data
-            })
-        
-        return ParseResult(
-            content="\n".join(content),
-            metadata={
-                "paragraph_count": len(doc.paragraphs),
-                "table_count": len(doc.tables),
-                "file_size": os.path.getsize(file_path)
-            },
-            file_type="docx",
-            tables=tables
-        )
-    
-    async def _parse_xlsx(self, file_path: str) -> ParseResult:
-        """
-        解析Excel文档
-        
-        Args:
-            file_path: 文件路径
-            
-        Returns:
-            ParseResult: 解析结果
-        """
-        wb = openpyxl.load_workbook(file_path)
-        content = []
-        tables = []
-        
-        # 遍历所有工作表
-        for sheet_idx, sheet_name in enumerate(wb.sheetnames):
-            sheet = wb[sheet_name]
-            content.append(f"# 工作表: {sheet_name}")
-            
-            # 提取表格数据
-            table_data = []
-            max_row = sheet.max_row
-            max_col = sheet.max_column
-            
-            # 提取表头
-            header = []
-            for col in range(1, max_col + 1):
-                cell_value = sheet.cell(row=1, column=col).value
-                header.append(str(cell_value) if cell_value else '')
-            table_data.append(header)
-            
-            # 提取表格内容
-            for row in range(2, max_row + 1):
-                row_data = []
-                for col in range(1, max_col + 1):
-                    cell_value = sheet.cell(row=row, column=col).value
-                    row_data.append(str(cell_value) if cell_value else '')
-                table_data.append(row_data)
-            
-            # 转换为Markdown表格
-            if header:
-                markdown_table = []
-                markdown_table.append('| ' + ' | '.join(header) + ' |')
-                markdown_table.append('| ' + ' | '.join(['---'] * len(header)) + ' |')
-                for row_data in table_data[1:]:
-                    markdown_table.append('| ' + ' | '.join(row_data) + ' |')
-                content.append('\n'.join(markdown_table))
-            
-            tables.append({
-                "sheet_name": sheet_name,
-                "data": table_data
-            })
-        
-        wb.close()
-        
-        return ParseResult(
-            content="\n\n".join(content),
-            metadata={
-                "sheet_count": len(wb.sheetnames),
-                "file_size": os.path.getsize(file_path)
-            },
-            file_type="xlsx",
-            tables=tables
-        )
-    
-    async def _parse_pptx(self, file_path: str) -> ParseResult:
-        """
-        解析PPT文档
-        
-        Args:
-            file_path: 文件路径
-            
-        Returns:
-            ParseResult: 解析结果
-        """
-        prs = Presentation(file_path)
-        content = []
-        
-        # 遍历所有幻灯片
-        for slide_idx, slide in enumerate(prs.slides):
-            content.append(f"# 幻灯片 {slide_idx + 1}")
-            
-            # 提取标题
-            for shape in slide.shapes:
-                if hasattr(shape, 'text_frame') and shape.text_frame.text:
-                    if shape == slide.shapes[0]:  # 假设第一个形状是标题
-                        content.append(f"## {shape.text_frame.text}")
-                    else:
-                        content.append(shape.text_frame.text)
-            
-            # 提取备注
-            if slide.notes_slide:
-                notes = slide.notes_slide.notes_text_frame.text
-                if notes:
-                    content.append(f"### 备注\n{notes}")
-        
-        return ParseResult(
-            content="\n\n".join(content),
-            metadata={
-                "slide_count": len(prs.slides),
-                "file_size": os.path.getsize(file_path)
-            },
-            file_type="pptx"
-        )
-    
-    async def _parse_doc(self, file_path: str) -> ParseResult:
-        """
-        解析.doc文件
-        
-        Args:
-            file_path: 文件路径
-            
-        Returns:
-            ParseResult: 解析结果
-        """
-        # 使用antiword提取.doc文件内容
-        import subprocess
-        try:
-            result = subprocess.run(
-                ['antiword', file_path],
-                capture_output=True,
-                text=True,
-                check=True
-            )
-            text = result.stdout
-        except Exception as e:
-            log.error(f"antiword解析失败: {str(e)}")
-            raise Exception(f"antiword解析失败: {str(e)}")
-        
-        content = [text]
-        
-        return ParseResult(
-            content="\n".join(content),
-            metadata={
-                "file_size": os.path.getsize(file_path)
-            },
-            file_type="doc"
-        )

+ 0 - 43
schedule-manager/parser/parsers/text_parser.py

@@ -1,43 +0,0 @@
-from core.router import Parser
-from models.result import ParseResult
-from utils.logger import log
-
-
-class TextParser(Parser):
-    """文本文件解析器"""
-    
-    async def parse(self, file_path: str) -> ParseResult:
-        """
-        解析文本文件
-        
-        Args:
-            file_path: 文件路径
-            
-        Returns:
-            ParseResult: 解析结果
-        """
-        log.info(f"开始解析文本文件: {file_path}")
-        try:
-            # 读取文本文件内容
-            with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
-                content = f.read()
-            
-            # 构建解析结果
-            result = ParseResult(
-                content=content,
-                metadata={
-                    "file_size": len(content),
-                    "line_count": len(content.split('\n'))
-                },
-                file_type="text"
-            )
-            
-            log.info(f"文本文件解析完成,大小: {len(content)} 字符")
-            return result
-        except Exception as e:
-            log.error(f"文本文件解析失败: {str(e)}")
-            return ParseResult(
-                content="",
-                metadata={"error": str(e)},
-                file_type="text"
-            )

+ 0 - 151
schedule-manager/parser/parsers/video_parser.py

@@ -1,151 +0,0 @@
-from core.router import Parser
-from models.result import ParseResult
-from utils.logger import log
-from utils.ffmpeg_wrapper import FFmpegWrapper
-import os
-import tempfile
-import base64
-import requests
-from parsers.audio_parser import AudioParser
-
-
-class VideoParser(Parser):
-    """视频文件解析器"""
-    
-    def __init__(self):
-        self.ffmpeg = FFmpegWrapper()
-        self.audio_parser = AudioParser()
-        # Qwen3-VL模型配置
-        self.qwen_api_url = "http://10.192.72.13:7280/v1/chat/completions"
-    
-    async def parse(self, file_path: str) -> ParseResult:
-        """
-        解析视频文件
-        
-        Args:
-            file_path: 文件路径
-            
-        Returns:
-            ParseResult: 解析结果
-        """
-        log.info(f"开始解析视频文件: {file_path}")
-        try:
-            # 1. 提取音频轨道
-            with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as temp_file:
-                temp_audio_path = temp_file.name
-            
-            self.ffmpeg.extract_audio(file_path, temp_audio_path)
-            log.info(f"音频提取完成: {temp_audio_path}")
-            
-            # 2. 使用AudioParser解析音频
-            audio_result = await self.audio_parser.parse(temp_audio_path)
-            log.info("音频解析完成")
-            
-            # 3. 提取关键帧
-            frame_results = []  # 移到外部定义
-            with tempfile.TemporaryDirectory() as temp_dir:
-                # 使用固定频率先提取帧,再通过帧差法筛选关键帧
-                interval_seconds = 10
-                diff_threshold = 15.0
-                keyframes = self.ffmpeg.extract_keyframes(file_path, temp_dir, interval=interval_seconds, diff_threshold=diff_threshold)
-                log.info(f"关键帧提取完成(帧差阈值={diff_threshold}),共{len(keyframes)}张")
-
-                # 4. 使用Qwen3-VL解析关键帧;根据帧文件名计算时间点
-                for idx, frame_path in enumerate(keyframes):
-                    try:
-                        frame_content = self._parse_frame_with_qwen(frame_path)
-                        log.info(f"解析关键帧 {idx+1} 结果长度: {len(frame_content) if frame_content else 0}")
-                        if frame_content:
-                            # 从文件名解析帧序号,filename like frame_000001.jpg
-                            try:
-                                base = os.path.basename(frame_path)
-                                num_part = base.split('_')[1].split('.')[0]
-                                frame_index = int(num_part)
-                                time_second = (frame_index - 1) * interval_seconds
-                            except Exception:
-                                time_second = idx * interval_seconds
-
-                            frame_results.append((time_second, frame_content))
-                            log.info(f"添加关键帧 到结果列表,时间:{time_second}s")
-                        else:
-                            log.warning(f"关键帧 {idx+1} 解析结果为空")
-                    except Exception as e:
-                        log.warning(f"解析关键帧 {idx+1} 失败: {str(e)}")
-            
-            log.info(f"关键帧解析完成,frame_results长度: {len(frame_results)}")
-            
-            # 5. 合并结果
-            content = []
-            content.append("# 音频内容")
-            content.append(audio_result.content)
-            
-            if frame_results:
-                log.info("开始添加画面内容到结果")
-                content.append("\n# 画面内容")
-                for time_second, frame_content in frame_results:
-                    content.append(f"\n## 第{time_second}秒")
-                    content.append(frame_content)
-                    log.info(f"添加第{time_second}秒画面内容,长度: {len(frame_content)}")
-            else:
-                log.warning("没有画面内容可以添加")
-            
-            # 清理临时文件
-            if os.path.exists(temp_audio_path):
-                os.remove(temp_audio_path)
-            
-            return ParseResult(
-                content="\n".join(content),
-                metadata={
-                    "parser": "VideoParser",
-                    "file_size": os.path.getsize(file_path),
-                    "audio_parser": "Qwen3-ASR",
-                    "visual_parser": "Qwen3-VL",
-                    "keyframe_count": len(keyframes)
-                },
-                file_type="video"
-            )
-        except Exception as e:
-            log.error(f"视频文件解析失败: {str(e)}")
-            # 清理临时文件
-            if 'temp_audio_path' in locals() and os.path.exists(temp_audio_path):
-                os.remove(temp_audio_path)
-            return ParseResult(
-                content="",
-                metadata={"error": str(e)},
-                file_type="video"
-            )
-    
-    def _parse_frame_with_qwen(self, image_path: str) -> str:
-        """
-        使用Qwen3-VL模型解析图片
-        
-        Args:
-            image_path: 图片路径
-            
-        Returns:
-            str: 解析结果
-        """
-        log.info(f"使用Qwen3-VL解析图片: {image_path}")
-        
-        # 编码图片
-        with open(image_path, "rb") as f:
-            base64_image = base64.b64encode(f.read()).decode("utf-8")
-
-        # 发送请求
-        payload = {
-            "model": "/model",
-            "messages": [{
-                "role": "user",
-                "content": [
-                    {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}},
-                    {"type": "text", "text": "详细描述这张图片的内容,包括人物、物体、场景、文字等所有可见信息"}
-                ]
-            }],
-            "max_tokens": 512
-        }
-
-        response = requests.post(self.qwen_api_url, json=payload, timeout=120)
-        response.raise_for_status()
-        result = response.json()
-        
-        return result["choices"][0]["message"]["content"]

+ 0 - 336
schedule-manager/parser/parsers/visual_parser.py

@@ -1,336 +0,0 @@
-from core.router import Parser
-from models.result import ParseResult
-from utils.logger import log
-import os
-import time
-import httpx
-import json
-import io
-import zipfile
-from pathlib import Path
-import asyncio
-
-# 延迟导入PaddleOCR,避免模块级初始化
-
-
-class VisualDocParser(Parser):
-    """视觉文档解析器,处理图片和扫描件PDF"""
-    
-    def __init__(self):
-        # MinerU API配置 - 使用本地部署的服务
-        self.mineru_api_key = ""
-        self.base_url = "http://10.192.72.13:7284"
-        self.model_version = "hybrid-auto-engine"
-        self.poll_interval_sec = 3.0
-        self.max_wait_sec = 300.0
-        log.info("VisualDocParser初始化完成,使用本地部署的MinerU服务")
-    
-    async def parse(self, file_path: str) -> ParseResult:
-        """
-        解析视觉文档
-        
-        Args:
-            file_path: 文件路径
-            
-        Returns:
-            ParseResult: 解析结果
-        """
-        log.info(f"开始解析视觉文档: {file_path}")
-        try:
-            # 只使用MinerU API,避免PaddleOCR的初始化问题
-            result = await self._try_mineru(file_path)
-            if result:
-                return result
-            
-            # MinerU失败时,返回错误信息
-            return ParseResult(
-                content="",
-                metadata={"error": "MinerU API解析失败"},
-                file_type="visual"
-            )
-        except Exception as e:
-            log.error(f"视觉文档解析失败: {str(e)}")
-            return ParseResult(
-                content="",
-                metadata={"error": str(e)},
-                file_type="visual"
-            )
-    
-    async def _try_mineru(self, file_path: str) -> ParseResult:
-        """
-        尝试使用本地MinerU API解析
-        
-        Args:
-            file_path: 文件路径
-            
-        Returns:
-            ParseResult: 解析结果,如果失败返回None
-        """
-        try:
-            log.info("开始使用本地MinerU API解析文件")
-
-            file_path_obj = Path(file_path)
-            if not file_path_obj.exists():
-                raise FileNotFoundError(str(file_path))
-
-            log.info(f"Calling local MinerU for file: {file_path}")
-
-            # 直接使用本地API上传文件并获取结果
-            result = await self._upload_and_parse(file_path_obj)
-            
-            # 提取文本内容
-            text_content = self._extract_text_from_local_result(result)
-            
-            return ParseResult(
-                content=text_content,
-                metadata={
-                    "parser": "Local MinerU API",
-                    "file_size": file_path_obj.stat().st_size,
-                    "backend": self.model_version
-                },
-                file_type="visual"
-            )
-            
-        except Exception as e:
-            log.warning(f"本地MinerU API解析失败: {str(e)}")
-            return None
-    
-    async def _upload_and_parse(self, file_path: Path) -> dict:
-        """
-        上传文件并解析
-        
-        Args:
-            file_path: 文件路径
-            
-        Returns:
-            dict: 解析结果
-        """
-        url = f"{self.base_url}/file_parse"
-        
-        # 准备表单数据
-        files = {
-            "files": (file_path.name, open(file_path, 'rb'))
-        }
-        
-        # 准备参数
-        data = {
-            "backend": self.model_version,
-            "lang_list": ["ch"],
-            "return_md": True,
-            "formula_enable": True,
-            "table_enable": True
-        }
-
-        async with httpx.AsyncClient(timeout=300) as client:
-            resp = await client.post(url, files=files, data=data)
-            resp.raise_for_status()
-            result = resp.json()
-
-        return result
-
-    def _extract_text_from_local_result(self, result: dict) -> str:
-        """
-        从本地MinerU返回的结果中提取文本内容
-        
-        Args:
-            result: 本地MinerU返回的结果
-            
-        Returns:
-            str: 提取的文本内容
-        """
-        # 处理不同可能的返回结构
-        text_parts = []
-        
-        if isinstance(result, dict):
-            # 检查是否有results字段(新的返回结构)
-            if "results" in result:
-                results = result["results"]
-                if isinstance(results, dict):
-                    for key, value in results.items():
-                        if isinstance(value, dict):
-                            # 检查是否有md_content字段
-                            if "md_content" in value:
-                                text_parts.append(str(value["md_content"]))
-                            # 检查是否有text字段
-                            elif "text" in value:
-                                text_parts.append(str(value["text"]))
-            # 检查是否有markdown内容
-            elif "markdown" in result:
-                text_parts.append(str(result["markdown"]))
-            # 检查是否有text字段
-            elif "text" in result:
-                text_parts.append(str(result["text"]))
-            # 检查是否有content字段
-            elif "content" in result:
-                if isinstance(result["content"], str):
-                    text_parts.append(result["content"])
-                elif isinstance(result["content"], list):
-                    for item in result["content"]:
-                        if isinstance(item, dict) and "text" in item:
-                            text_parts.append(str(item["text"]))
-                        elif isinstance(item, str):
-                            text_parts.append(item)
-        
-        return "\n\n".join(text_parts)
-    
-    def _safe_stem(self, stem: str) -> str:
-        """
-        创建安全的缓存键
-        
-        Args:
-            stem: 文件stem
-            
-        Returns:
-            str: 安全的缓存键
-        """
-        import re
-        return re.sub(r'[^a-zA-Z0-9_-]', '_', stem)
-    
-    def _extract_text_from_payload(self, payload: dict) -> str:
-        """
-        从MinerU返回的payload中提取文本内容
-        
-        Args:
-            payload: MinerU返回的payload
-            
-        Returns:
-            str: 提取的文本内容
-        """
-        # 根据MinerU API返回的结构提取文本
-        text_parts = []
-        
-        # 处理不同可能的返回结构
-        if isinstance(payload, dict):
-            # 检查是否有text字段
-            if "text" in payload:
-                text_parts.append(str(payload["text"]))
-            # 检查是否有content字段
-            elif "content" in payload:
-                if isinstance(payload["content"], str):
-                    text_parts.append(payload["content"])
-                elif isinstance(payload["content"], list):
-                    for item in payload["content"]:
-                        if isinstance(item, dict) and "text" in item:
-                            text_parts.append(str(item["text"]))
-                        elif isinstance(item, str):
-                            text_parts.append(item)
-            # 检查是否有pages字段
-            elif "pages" in payload:
-                for page_num, page_content in enumerate(payload["pages"], 1):
-                    text_parts.append(f"# 第{page_num}页")
-                    if isinstance(page_content, str):
-                        text_parts.append(page_content)
-                    elif isinstance(page_content, dict) and "text" in page_content:
-                        text_parts.append(str(page_content["text"]))
-        elif isinstance(payload, list):
-            for item in payload:
-                if isinstance(item, dict) and "text" in item:
-                    text_parts.append(str(item["text"]))
-                elif isinstance(item, str):
-                    text_parts.append(item)
-        
-        return "\n\n".join(text_parts)
-    
-    async def _use_paddleocr(self, file_path: str) -> ParseResult:
-        """
-        使用PaddleOCR解析
-        
-        Args:
-            file_path: 文件路径
-            
-        Returns:
-            ParseResult: 解析结果
-        """
-        log.info("使用PaddleOCR解析视觉文档")
-        
-        # 检查PaddleOCR是否初始化成功
-        if self.ocr is None:
-            log.error("PaddleOCR未初始化,无法解析")
-            return ParseResult(
-                content="",
-                metadata={"error": "PaddleOCR未初始化"},
-                file_type="visual"
-            )
-        
-        # 对于PDF文件,需要先转换为图片
-        if file_path.endswith('.pdf'):
-            return await self._ocr_pdf(file_path)
-        else:
-            return await self._ocr_image(file_path)
-    
-    async def _ocr_pdf(self, file_path: str) -> ParseResult:
-        """
-        OCR处理PDF文件
-        
-        Args:
-            file_path: 文件路径
-            
-        Returns:
-            ParseResult: 解析结果
-        """
-        import fitz  # PyMuPDF
-        
-        doc = fitz.open(file_path)
-        content = []
-        page_count = len(doc)
-        
-        # 遍历所有页面
-        for page_num in range(page_count):
-            page = doc[page_num]
-            # 将页面转换为图片
-            pix = page.get_pixmap(dpi=300)
-            img_path = f"temp_page_{page_num}.png"
-            pix.save(img_path)
-            
-            # OCR处理图片
-            ocr_result = self.ocr.ocr(img_path, cls=True)
-            page_text = []
-            
-            for line in ocr_result:
-                for word_info in line:
-                    page_text.append(word_info[1][0])
-            
-            content.append(f"# 第{page_num + 1}页\n{' '.join(page_text)}")
-            
-            # 删除临时图片
-            if os.path.exists(img_path):
-                os.remove(img_path)
-        
-        doc.close()
-        
-        return ParseResult(
-            content="\n\n".join(content),
-            metadata={
-                "parser": "PaddleOCR",
-                "page_count": page_count,
-                "file_size": os.path.getsize(file_path)
-            },
-            file_type="pdf_scanned"
-        )
-    
-    async def _ocr_image(self, file_path: str) -> ParseResult:
-        """
-        OCR处理图片文件
-        
-        Args:
-            file_path: 文件路径
-            
-        Returns:
-            ParseResult: 解析结果
-        """
-        # 使用PaddleOCR识别图片
-        ocr_result = self.ocr.ocr(file_path, cls=True)
-        content = []
-        
-        for line in ocr_result:
-            for word_info in line:
-                content.append(word_info[1][0])
-        
-        return ParseResult(
-            content=' '.join(content),
-            metadata={
-                "parser": "PaddleOCR",
-                "file_size": os.path.getsize(file_path)
-            },
-            file_type="image"
-        )

+ 15 - 0
schedule-manager/parser/readme.md

@@ -0,0 +1,15 @@
+# 创建并使用buildx构建器(只需执行一次)
+docker buildx create --name mybuilder --use
+# 启动构建器
+docker buildx inspect --bootstrap
+
+
+# 构建并加载到本地,适配amd64和arm64
+docker buildx build \
+--platform linux/amd64 \
+-t parse:amd64 \
+--load .
+
+
+
+docker run -d -p 8000:8000 --name parser parser:latest

+ 0 - 165
schedule-manager/parser/utils/ffmpeg_wrapper.py

@@ -1,165 +0,0 @@
-import subprocess
-import os
-from typing import Optional
-try:
-    import cv2
-    import numpy as np
-except Exception:
-    cv2 = None
-    np = None
-
-
-class FFmpegWrapper:
-    """FFmpeg命令行包装工具"""
-    
-    def __init__(self):
-        self.ffmpeg_path = "ffmpeg"  # 假设ffmpeg已在系统PATH中
-    
-    def extract_audio(self, video_path: str, output_audio_path: str) -> bool:
-        """
-        从视频中提取音频轨道
-        
-        Args:
-            video_path: 视频文件路径
-            output_audio_path: 输出音频文件路径
-            
-        Returns:
-            bool: 操作是否成功
-        """
-        try:
-            cmd = [
-                self.ffmpeg_path,
-                "-i", video_path,
-                "-vn",  # 禁用视频
-                "-acodec", "pcm_s16le",  # 16位PCM
-                "-ar", "16000",  # 16kHz采样率
-                "-ac", "1",  # 单声道
-                "-y",  # 覆盖输出文件
-                output_audio_path
-            ]
-            
-            subprocess.run(cmd, check=True, capture_output=True, text=True)
-            return True
-        except subprocess.CalledProcessError as e:
-            raise Exception(f"音频提取失败: {e.stderr}")
-    
-    def convert_audio(self, input_audio_path: str, output_audio_path: str) -> bool:
-        """
-        转换音频格式为16k/16bit/mono wav
-        
-        Args:
-            input_audio_path: 输入音频文件路径
-            output_audio_path: 输出音频文件路径
-            
-        Returns:
-            bool: 操作是否成功
-        """
-        try:
-            cmd = [
-                self.ffmpeg_path,
-                "-i", input_audio_path,
-                "-acodec", "pcm_s16le",
-                "-ar", "16000",
-                "-ac", "1",
-                "-y",
-                output_audio_path
-            ]
-            
-            subprocess.run(cmd, check=True, capture_output=True, text=True)
-            return True
-        except subprocess.CalledProcessError as e:
-            raise Exception(f"音频转换失败: {e.stderr}")
-    
-    def extract_keyframes(self, video_path: str, output_dir: str, interval: int = 60, diff_threshold: Optional[float] = None) -> list:
-        """
-        从视频中提取关键帧
-        
-        Args:
-            video_path: 视频文件路径
-            output_dir: 输出目录
-            interval: 提取间隔(秒)
-            
-        Returns:
-            list: 提取的关键帧文件路径列表
-        """
-        try:
-            # 确保输出目录存在
-            os.makedirs(output_dir, exist_ok=True)
-            
-            # 提取关键帧(按固定频率导出帧)
-            output_pattern = os.path.join(output_dir, "frame_%06d.jpg")
-            cmd = [
-                self.ffmpeg_path,
-                "-i", video_path,
-                "-vf", f"fps=1/{interval}",  # 每 interval 秒一张
-                "-y",
-                output_pattern
-            ]
-            
-            subprocess.run(cmd, check=True, capture_output=True, text=True)
-            
-            # 收集提取的帧
-            frames = []
-            for file in os.listdir(output_dir):
-                if file.startswith("frame_") and file.endswith(".jpg"):
-                    frames.append(os.path.join(output_dir, file))
-
-            frames = sorted(frames)
-
-            # 如果未提供差异阈值,直接返回所有按固定频率提取的帧
-            if diff_threshold is None:
-                return frames
-
-            # 检查依赖
-            if cv2 is None or np is None:
-                raise Exception("OpenCV (opencv-python) 和 numpy 需要安装以启用帧差法(pip install opencv-python numpy)")
-
-            # 使用 OpenCV 的灰度图像计算帧差,比较当前帧与上一个边界帧(pre),当差异>=阈值时标记为关键帧
-            # 首先读取所有帧(彩色),以便能使用 cvtColor 按要求比较
-            imgs = []
-            for frame_path in frames:
-                try:
-                    img = cv2.imread(frame_path)  # BGR
-                    imgs.append(img)
-                except Exception:
-                    imgs.append(None)
-
-            filtered = []
-            # 为了加速计算,统一缩放尺寸 (width, height)
-            resize_to = (320, 240)
-
-            # 找到第一个有效帧作为初始关键帧
-            pre = None
-            for idx, img in enumerate(imgs):
-                if img is not None:
-                    filtered.append(frames[idx])
-                    pre = idx
-                    break
-
-            if pre is None:
-                return []
-
-            # 从下一个帧开始,比较当前帧与 imgs[pre]
-            for i in range(pre + 1, len(imgs)):
-                curr = imgs[i]
-                if curr is None:
-                    continue
-                prev = imgs[pre]
-                try:
-                    prev_gray = cv2.cvtColor(prev, cv2.COLOR_BGR2GRAY)
-                    curr_gray = cv2.cvtColor(curr, cv2.COLOR_BGR2GRAY)
-                    if resize_to is not None:
-                        prev_gray = cv2.resize(prev_gray, resize_to, interpolation=cv2.INTER_AREA)
-                        curr_gray = cv2.resize(curr_gray, resize_to, interpolation=cv2.INTER_AREA)
-
-                    diff_val = np.mean(np.abs(curr_gray.astype(int) - prev_gray.astype(int)))
-                except Exception:
-                    continue
-
-                if diff_val >= float(diff_threshold):
-                    filtered.append(frames[i])
-                    pre = i
-
-            return filtered
-        except subprocess.CalledProcessError as e:
-            raise Exception(f"关键帧提取失败: {e.stderr}")

+ 0 - 47
schedule-manager/parser/utils/logger.py

@@ -1,47 +0,0 @@
-from loguru import logger
-import os
-
-
-class Logger:
-    """日志管理工具"""
-    
-    def __init__(self, log_file: str = "parsing.log"):
-        """
-        初始化日志配置
-        
-        Args:
-            log_file: 日志文件路径
-        """
-        # 移除默认的控制台输出
-        logger.remove()
-        
-        # 添加控制台输出
-        logger.add(
-            sink=lambda msg: print(msg, end=""),
-            level="INFO",
-            format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>"
-        )
-        
-        # 添加文件输出
-        logger.add(
-            sink=log_file,
-            level="DEBUG",
-            rotation="100 MB",
-            compression="zip",
-            format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}"
-        )
-    
-    @property
-    def log(self):
-        """
-        获取logger实例
-        
-        Returns:
-            logger: loguru logger实例
-        """
-        return logger
-
-
-# 创建全局日志实例
-logger_instance = Logger()
-log = logger_instance.log

+ 0 - 51
schedule-manager/parser/utils/mime_detector.py

@@ -1,51 +0,0 @@
-import filetype
-import os
-
-
-class MimeDetector:
-    """文件MIME类型检测工具"""
-    
-    def __init__(self):
-        pass
-    
-    def detect(self, file_path: str) -> str:
-        """
-        检测文件的MIME类型
-        
-        Args:
-            file_path: 文件路径
-            
-        Returns:
-            str: MIME类型字符串
-        """
-        try:
-            # 使用filetype库检测文件类型
-            kind = filetype.guess(file_path)
-            if kind:
-                return kind.mime
-            else:
-                # 如果filetype无法检测,根据文件扩展名猜测
-                ext = os.path.splitext(file_path)[1].lower()
-                ext_to_mime = {
-                    '.txt': 'text/plain',
-                    '.md': 'text/markdown',
-                    '.pdf': 'application/pdf',
-                    '.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
-                    '.doc': 'application/msword',
-                    '.xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
-                    '.xls': 'application/vnd.ms-excel',
-                    '.pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation',
-                    '.ppt': 'application/vnd.ms-powerpoint',
-                    '.jpg': 'image/jpeg',
-                    '.jpeg': 'image/jpeg',
-                    '.png': 'image/png',
-                    '.gif': 'image/gif',
-                    '.wav': 'audio/wav',
-                    '.mp3': 'audio/mpeg',
-                    '.mp4': 'video/mp4',
-                    '.avi': 'video/x-msvideo',
-                    '.mov': 'video/quicktime'
-                }
-                return ext_to_mime.get(ext, 'application/octet-stream')
-        except Exception as e:
-            raise Exception(f"文件类型检测失败: {str(e)}")

+ 0 - 80
schedule-manager/parser/utils/stability.py

@@ -1,80 +0,0 @@
-import asyncio
-import functools
-from typing import Callable, Any, List, Coroutine
-from utils.logger import log
-
-
-def timeout(seconds: int):
-    """
-    超时装饰器,防止函数执行时间过长
-    
-    Args:
-        seconds: 超时时间(秒)
-        
-    Returns:
-        Callable: 装饰后的函数
-    """
-    def decorator(func: Callable) -> Callable:
-        @functools.wraps(func)
-        async def wrapper(*args, **kwargs) -> Any:
-            try:
-                return await asyncio.wait_for(func(*args, **kwargs), timeout=seconds)
-            except asyncio.TimeoutError:
-                log.error(f"函数 {func.__name__} 执行超时,已超过 {seconds} 秒")
-                raise Exception(f"执行超时,已超过 {seconds} 秒")
-        return wrapper
-    return decorator
-
-
-class AsyncDispatcher:
-    """异步调度器,支持并发处理多个任务"""
-    
-    def __init__(self, max_concurrency: int = 5):
-        """
-        初始化异步调度器
-        
-        Args:
-            max_concurrency: 最大并发数
-        """
-        self.max_concurrency = max_concurrency
-    
-    async def run(self, tasks: List[Coroutine]) -> List[Any]:
-        """
-        并发执行多个任务
-        
-        Args:
-            tasks: 任务列表
-            
-        Returns:
-            List[Any]: 任务执行结果列表
-        """
-        log.info(f"开始并发执行 {len(tasks)} 个任务,最大并发数: {self.max_concurrency}")
-        
-        # 创建信号量控制并发
-        semaphore = asyncio.Semaphore(self.max_concurrency)
-        
-        async def bounded_task(task: Coroutine) -> Any:
-            async with semaphore:
-                try:
-                    return await task
-                except Exception as e:
-                    log.error(f"任务执行失败: {str(e)}")
-                    return None
-        
-        # 并发执行任务
-        results = await asyncio.gather(
-            *[bounded_task(task) for task in tasks],
-            return_exceptions=True
-        )
-        
-        # 处理异常结果
-        processed_results = []
-        for i, result in enumerate(results):
-            if isinstance(result, Exception):
-                log.error(f"第 {i+1} 个任务执行失败: {str(result)}")
-                processed_results.append(None)
-            else:
-                processed_results.append(result)
-        
-        log.info(f"并发任务执行完成,成功 {sum(1 for r in processed_results if r is not None)} 个")
-        return processed_results

+ 3 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/common/PortPool.java

@@ -1,6 +1,7 @@
 package cn.com.yusys.manager.common;
 
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
@@ -26,7 +27,9 @@ public class PortPool {
     private final Map<Integer, String> allocatedPorts = new ConcurrentHashMap<>();
 
     // 端口配置
+    @Value("${parser.instance.port.start:1030}")
     private int portStart = 1030;
+    @Value("${parser.instance.port.end:1050}")
     private int portEnd = 1050;
 
     /**

+ 0 - 4
schedule-manager/src/main/java/cn/com/yusys/manager/config/ParserConfig.java

@@ -32,10 +32,6 @@ public class ParserConfig {
     @Value("${parser.monitor.status-query-fail-count}")
     public int STATUS_QUERY_FAIL_COUNT;
 
-    // Java管理器地址
-    @Value("${parser.manager.url}")
-    public String MANAGER_URL;
-
     // Python实例状态接口路径
     public static final String STATUS_API = "/status";
 

+ 56 - 4
schedule-manager/src/main/java/cn/com/yusys/manager/controller/TaskController.java

@@ -1,7 +1,9 @@
 package cn.com.yusys.manager.controller;
 
+import cn.com.yusys.manager.entity.TaskRecordEntity;
 import cn.com.yusys.manager.model.ExecuteResponse;
 import cn.com.yusys.manager.model.Task;
+import cn.com.yusys.manager.model.TaskLogRequest;
 import cn.com.yusys.manager.model.TaskRecordRequest;
 import cn.com.yusys.manager.service.InstanceMonitorService;
 import cn.com.yusys.manager.service.TaskRecordService;
@@ -11,6 +13,10 @@ import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.*;
 
 import javax.annotation.Resource;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 
 @Slf4j
 @RestController
@@ -33,10 +39,10 @@ public class TaskController {
     public ResponseEntity<String> createTaskRecord(@RequestBody TaskRecordRequest taskRecordRequest) {
         try {
             taskRecordService.createTaskRecord(taskRecordRequest);
-            return ResponseEntity.ok("Task record created successfully");
+            return ResponseEntity.ok("任务记录创建成功");
         } catch (Exception e) {
             log.error("创建任务记录失败", e);
-            return ResponseEntity.status(500).body("Failed to create task record: " + e.getMessage());
+            return ResponseEntity.status(500).body("任务记录创建失败: " + e.getMessage());
         }
     }
 
@@ -49,10 +55,10 @@ public class TaskController {
     public ResponseEntity<String> updateTaskStatus(@RequestBody TaskRecordRequest taskRecordRequest) {
         try {
             taskRecordService.updateStatus(taskRecordRequest);
-            return ResponseEntity.ok("Task status updated successfully");
+            return ResponseEntity.ok("任务状态更新成功");
         } catch (Exception e) {
             log.error("更新任务状态失败", e);
-            return ResponseEntity.status(500).body("Failed to update task status: " + e.getMessage());
+            return ResponseEntity.status(500).body("任务状态更新失败: " + e.getMessage());
         }
     }
 
@@ -65,4 +71,50 @@ public class TaskController {
     public ExecuteResponse executeParseTask(@RequestBody Task request) {
         return parserService.processMultimodalTask(request);
     }
+
+    /**
+     * 查询任务记录
+     * @param taskId 任务ID
+     * @return 任务记录
+     */
+    @GetMapping("/{taskId}")
+    public ExecuteResponse getTaskRecord(@PathVariable String taskId) {
+        try {
+            TaskRecordEntity record = taskRecordService.getTaskRecordByTaskId(taskId);
+            if (record != null) {
+                return ExecuteResponse.success(record);
+            } else {
+                return ExecuteResponse.fail("未找到任务记录");
+            }
+        } catch (Exception e) {
+            log.error("查询任务记录失败", e);
+            return ExecuteResponse.fail("查询任务记录失败: " + e.getMessage());
+        }
+    }
+
+    /**
+     * 根据日志路径查询日志内容
+     * @param taskLogRequest 日志查询请求对象,包含日志路径
+     * @return 日志内容
+     */
+    @PostMapping("/log")
+    public ExecuteResponse getLog(@RequestBody TaskLogRequest taskLogRequest) {
+        try {
+            String path = taskLogRequest.getLogPath();
+            // 验证路径,确保在 logs/tasks/ 下且以 .log 结尾
+            if (!path.startsWith("logs/tasks/") || !path.endsWith(".log")) {
+                return ExecuteResponse.fail("无效的日志路径");
+            }
+            Path logPath = Paths.get(path);
+            if (!Files.exists(logPath)) {
+                return ExecuteResponse.fail("日志文件不存在");
+            }
+            byte[] bytes = Files.readAllBytes(logPath);
+            String content = new String(bytes, StandardCharsets.UTF_8);
+            return ExecuteResponse.success(content);
+        } catch (Exception e) {
+            log.error("读取日志失败", e);
+            return ExecuteResponse.fail("读取日志失败: " + e.getMessage());
+        }
+    }
 }

+ 8 - 6
schedule-manager/src/main/java/cn/com/yusys/manager/instanceManager/Impl/ProcessInstanceManager.java

@@ -1,23 +1,25 @@
 package cn.com.yusys.manager.instanceManager.Impl;
 
-import cn.com.yusys.manager.common.ParseInstanceStatusRegistry;
 import cn.com.yusys.manager.instanceManager.InstanceManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
-import javax.annotation.Resource;
-import java.io.*;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 public class ProcessInstanceManager implements InstanceManager {
 

+ 4 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/model/InstanceManagementResponse.java

@@ -81,6 +81,10 @@ public class InstanceManagementResponse {
          * 上次心跳时间
          */
         private Long lastHeartbeatTime;
+
+
+        //上次空闲时间
+        private Long lastIdleTime;
     }
 
     /**

+ 3 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/model/InstanceStatus.java

@@ -39,6 +39,9 @@ public class InstanceStatus {
     //上次心跳时间
     private Long lastHeartbeatTime;
 
+    //上次空闲时间
+    private Long lastIdleTime;
+
     //查询失败次数
     private Integer statusQueryFailCount;
 

+ 14 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/model/TaskLogRequest.java

@@ -0,0 +1,14 @@
+package cn.com.yusys.manager.model;
+
+
+import lombok.Data;
+
+/**
+ * 任务日志请求类
+ * 用于封装任务日志相关的请求参数
+ */
+@Data
+public class TaskLogRequest {
+
+    private String logPath;  // 日志文件路径,用于指定要获取或操作的日志文件位置
+}

+ 123 - 147
schedule-manager/src/main/java/cn/com/yusys/manager/service/InstanceMonitorService.java

@@ -56,10 +56,6 @@ public class InstanceMonitorService {
     @Resource
     private TaskRecordService taskRecordService;
 
-    // 最大重试次数
-    @org.springframework.beans.factory.annotation.Value("${parser.task.max-retry:3}")
-    private int maxRetry;
-
     @PostConstruct
     public void initParseInstance(){
         log.info("开始初始化解析实例...");
@@ -71,7 +67,7 @@ public class InstanceMonitorService {
                 String instanceId = instanceManager.startParseInstance(port);
                 // 增加容器ID空值校验
                 if (instanceId == null || instanceId.isEmpty()) {
-                    log.error("初始化实例失败:进程创建失败,端口:{}", port);
+                    log.error("初始化实例失败:实例创建失败,端口:{}", port);
                     portPool.releasePort(port); // 归还端口
                     continue;
                 }
@@ -104,8 +100,8 @@ public class InstanceMonitorService {
             // 3. 校验活跃实例数,触发实例拉起
             checkAndSpinUpInstance();
 
-            // 4. 根据GPU负载动态扩缩容
-            checkAndScaleUpByGpuLoad();
+            // 4. 检查并缩容空闲实例
+            checkAndScaleDownIdleInstance();
 
         } catch (Exception e) {
             log.error("解析实例监控任务执行失败", e);
@@ -118,7 +114,6 @@ public class InstanceMonitorService {
     private void checkHeartbeatTimeout() {
         long now = System.currentTimeMillis();
         Map<String, InstanceStatus> activeInstancePool = instancestatusRegistry.getActiveInstancePool();
-        // 复制为新集合,避免遍历中修改原集合
         Map<String, InstanceStatus> copyPool = new HashMap<>(activeInstancePool);
 
         copyPool.forEach((instanceId, state) -> {
@@ -184,6 +179,9 @@ public class InstanceMonitorService {
                     instancestatus.setGpuUsage(data.getGpuUsage());
                     instancestatus.setLastHeartbeatTime(System.currentTimeMillis());
                     instancestatus.setGpuMemory(data.getGpuMemory());
+                    if (data.getStatus() == 0) {
+                        instancestatus.setLastIdleTime(System.currentTimeMillis());
+                    }
                     log.info("实例{}状态查询成功,状态:{}", instanceId, data.getStatus());
 
                 } else {
@@ -268,53 +266,45 @@ public class InstanceMonitorService {
     }
 
     /**
-     * 根据GPU负载检测并扩缩容实例
+     * 检查并缩容空闲实例
      */
-    private void checkAndScaleUpByGpuLoad() {
-        Map<String, InstanceStatus> activeInstancePool = instancestatusRegistry.getActiveInstancePool();
-
-        // 计算所有活跃实例的平均GPU负载
-        double avgGpuLoad = calculateAverageGpuLoad(activeInstancePool);
-        log.info("当前平均GPU负载:{}%,阈值:{}%", avgGpuLoad, parserConfig.GPU_LOAD_THRESHOLD);
+    private void checkAndScaleDownIdleInstance() {
+        int currentActiveNum = getEffectiveActiveInstanceNum();
+        if (currentActiveNum > parserConfig.MIN_ACTIVE_INSTANCE) {
+            Map<String, InstanceStatus> activeInstancePool = instancestatusRegistry.getActiveInstancePool();
 
-        // 判断是否需要扩容
-        if (avgGpuLoad < parserConfig.GPU_LOAD_THRESHOLD) {
-            int currentActiveNum = getEffectiveActiveInstanceNum();
+            // 筛选出空闲的实例,按 lastIdleTime 升序排序(最久空闲的在前)
+            List<InstanceStatus> idleInstances = activeInstancePool.values().stream()
+                    .filter(status -> status.getStatus() == 0 && status.getLastIdleTime() != null)
+                    .sorted(Comparator.comparingLong(InstanceStatus::getLastIdleTime))
+                    .collect(Collectors.toList());
 
-            // 检查是否可以扩容
-            if (currentActiveNum < parserConfig.MAX_ACTIVE_INSTANCE) {
-                ScaleUpInstance(currentActiveNum);
-            } else {
-                log.warn("GPU资源充足但已达到最大实例数({}),无法继续扩容", parserConfig.MAX_ACTIVE_INSTANCE);
-            }
-        }
+            if (!idleInstances.isEmpty()) {
+                InstanceStatus instanceToClose = idleInstances.get(0); // 最久空闲的
+                String instanceId = instanceToClose.getInstanceId();
 
-        //判断是否需要缩容
-        if (avgGpuLoad > parserConfig.GPU_LOAD_THRESHOLD) {
-            int currentActiveNum = getEffectiveActiveInstanceNum();
+                try {
+                    log.info("缩容空闲实例:{},端口:{}", instanceId, instanceToClose.getPort());
+
+                    // 标记实例为失联
+                    instanceToClose.setStatus(2);
+                    // 从活跃实例池移除
+                    activeInstancePool.remove(instanceId);
+                    // 释放端口
+                    portPool.releasePort(instanceToClose.getPort());
+                    // 关闭进程
+                    instanceManager.terminateInstance(instanceId);
 
-            // 检查是否可以缩容
-            if (currentActiveNum > parserConfig.MIN_ACTIVE_INSTANCE){
-                int needScaleNum = Math.min(parserConfig.GPU_SCALE_INSTANCE_NUM, currentActiveNum - parserConfig.MIN_ACTIVE_INSTANCE);
-                ScaleDownInstance(needScaleNum, activeInstancePool);
+                    log.info("缩容实例{}成功,已关闭进程并释放端口{}", instanceId, instanceToClose.getPort());
+                } catch (Exception e) {
+                    log.error("缩容实例{}失败", instanceId, e);
+                    // 缩容失败时,将实例重新加入活跃池(避免端口丢失)
+                    activeInstancePool.put(instanceId, instanceToClose);
+                }
             }
         }
     }
 
-    /**
-     * 计算所有活跃实例的平均GPU负载
-     * @param activeInstancePool 活跃实例池
-     * @return 平均GPU负载(百分比)
-     */
-    private double calculateAverageGpuLoad(Map<String, InstanceStatus> activeInstancePool) {
-        return activeInstancePool.values().stream()
-                .filter(status -> status.getStatus() == 1 || status.getStatus() == 0) // 只统计正常状态的实例
-                .filter(status -> status.getGpuUsage() != null) // 过滤掉GPU使用率为null的实例
-                .mapToDouble(InstanceStatus::getGpuUsage)
-                .average()
-                .orElse(0.0);
-    }
-
     //保存实例状态
     private InstanceStatus saveInstanceStatus(String instanceId, Integer port) {
         InstanceStatus instanceStatus = new InstanceStatus();
@@ -323,6 +313,7 @@ public class InstanceMonitorService {
         instanceStatus.setLastHeartbeatTime(System.currentTimeMillis());
         instanceStatus.setStatus(0);
         instanceStatus.setInstanceId(instanceId);
+        instanceStatus.setLastIdleTime(System.currentTimeMillis());
         // 获取并设置进程PID
         Long pid = instanceManager.getPid(instanceId);
         instanceStatus.setPid(pid);
@@ -332,77 +323,8 @@ public class InstanceMonitorService {
         return instanceStatus;
     }
 
-    //增加实例
-    private void ScaleUpInstance(int currentActiveNum){
-        int needCreateNum = parserConfig.GPU_SCALE_INSTANCE_NUM;
-        // 防止超过最大实例数
-        needCreateNum = Math.min(needCreateNum, parserConfig.MAX_ACTIVE_INSTANCE - currentActiveNum);
-
-        log.info("需要扩容{}个解析实例", needCreateNum);
-
-        for (int i = 0; i < needCreateNum; i++) {
-            // 使用PortPool分配端口
-            Integer port = portPool.allocatePort();
-            if(port != null){
-                String instanceId = instanceManager.startParseInstance(port);
-                // 增加实例ID空值校验
-                if (instanceId == null || instanceId.isEmpty()) {
-                    log.error("GPU扩容实例失败:进程创建失败,端口:{}", port);
-                    portPool.releasePort(port); // 归还端口
-                    continue;
-                }
-                InstanceStatus instanceStatus = saveInstanceStatus(instanceId, port);
-                log.info("基于GPU负载扩容,已创建实例,实例ID:{},端口:{}", instanceId, port);
-            } else {
-                log.warn("端口池已满,无法继续GPU扩容");
-                break;
-            }
-        }
-    }
-    /**
-     * 执行缩容操作:优先关闭负载最低的实例
-     * @param needDownNum 需要缩容的实例数
-     * @param activeInstancePool 活跃实例池
-     */
-    private void ScaleDownInstance(int needDownNum, Map<String, InstanceStatus> activeInstancePool) {
-        // 1. 筛选出正常运行的实例,并按GPU负载升序排序(负载最低的优先关闭)
-        List<Map.Entry<String, InstanceStatus>> sortedInstances = activeInstancePool.entrySet().stream()
-                .filter(entry -> entry.getValue().getStatus() == 1 || entry.getValue().getStatus() == 0) // 仅正常实例
-                .filter(entry -> entry.getValue().getGpuUsage() != null) // 有GPU负载数据
-                .sorted(Comparator.comparingDouble(entry -> entry.getValue().getGpuUsage())) // 升序排序
-                .limit(needDownNum) // 只取需要缩容的数量
-                .collect(Collectors.toList());
-
-        // 2. 逐个关闭实例
-        for (Map.Entry<String, InstanceStatus> entry : sortedInstances) {
-            String instanceId = entry.getKey();
-            InstanceStatus instanceStatus = entry.getValue();
-
-            log.info("开始缩容实例:{},GPU负载:{}%,端口:{}",
-                    instanceId, instanceStatus.getGpuUsage(), instanceStatus.getPort());
-
-            try {
-                // 标记实例为失联
-                instanceStatus.setStatus(2);
-                // 从活跃实例池移除
-                activeInstancePool.remove(instanceId);
-                // 释放端口
-                portPool.releasePort(instanceStatus.getPort());
-                // 关闭进程
-                instanceManager.terminateInstance(instanceId);
-
-                log.info("缩容实例{}成功,已关闭进程并释放端口{}", instanceId, instanceStatus.getPort());
-            } catch (Exception e) {
-                log.error("缩容实例{}失败", instanceId, e);
-                // 缩容失败时,将实例重新加入活跃池(避免端口丢失)
-                activeInstancePool.put(instanceId, instanceStatus);
-            }
-        }
-    }
-
     /**
      * 执行多模态任务解析任务,阻塞调用底层解析器执行任务
-     * 失败任务重试maxRetry次后,转入失败Topic
      */
    public ExecuteResponse processMultimodalTask(Task task) {
         Map<String, InstanceStatus> activeInstancePool = instancestatusRegistry.getActiveInstancePool();
@@ -421,12 +343,43 @@ public class InstanceMonitorService {
             //  检查是否有空闲的解析实例
             InstanceStatus idleInstance = findIdleInstance(activeInstancePool);
             if (idleInstance == null) {
-                log.debug("当前无空闲解析实例");
-                taskLogService.logTaskFailure(taskId, "当前无空闲解析实例");
-                // 更新任务状态为解析失败
-                taskRecordRequest.setStatus(4);
-                taskRecordService.updateStatus(taskRecordRequest);
-                return ExecuteResponse.fail(300,"当前无空闲解析实例");
+                // 检查是否可以扩容
+                int currentActiveNum = getEffectiveActiveInstanceNum();
+                if (currentActiveNum < parserConfig.MAX_ACTIVE_INSTANCE) {
+                    double avgGpuLoad = calculateAverageGpuLoad(activeInstancePool);
+                    if (avgGpuLoad < parserConfig.GPU_LOAD_THRESHOLD) {
+                        log.info("无空闲实例,GPU资源充足,扩容一个实例");
+                        ScaleUpOneInstance();
+                        // 重新查找空闲实例
+                        idleInstance = findIdleInstance(activeInstancePool);
+                        // 等待实例启动,最多10秒
+                        long startTime = System.currentTimeMillis();
+                        while (System.currentTimeMillis() - startTime < 10000) {
+                            try {
+                                Thread.sleep(1000);
+                            } catch (InterruptedException e) {
+                                Thread.currentThread().interrupt();
+                                break;
+                            }
+                            idleInstance = findIdleInstance(activeInstancePool);
+                            if (idleInstance != null) {
+                                log.info("新扩容实例已启动并空闲,开始处理任务");
+                                break;
+                            }
+                        }
+                        if (idleInstance == null) {
+                            log.warn("扩容后等待30秒仍无空闲实例,任务失败");
+                        }
+                    }
+                }
+                if (idleInstance == null) {
+                    log.debug("当前无空闲解析实例");
+                    taskLogService.logTaskFailure(taskId, "当前无空闲解析实例");
+                    // 更新任务状态为解析失败
+                    taskRecordRequest.setStatus(4);
+                    taskRecordService.updateStatus(taskRecordRequest);
+                    return ExecuteResponse.fail(300,"当前无空闲解析实例");
+                }
             }
             
             // 更新任务状态为解析中
@@ -437,7 +390,7 @@ public class InstanceMonitorService {
             taskLogService.logInstanceAllocation(taskId, idleInstance.getInstanceId());
 
             //  执行任务解析
-            ExecuteResponse response = executeTaskWithRetry(idleInstance, task);
+            ExecuteResponse response = executeTask(idleInstance, task);
             
             // 根据执行结果更新任务状态
             if ( response.getCode() == 200) {
@@ -481,41 +434,33 @@ public class InstanceMonitorService {
 
 
     /**
-     * 执行任务并处理重试逻辑
+     * 执行任务
      */
-    private ExecuteResponse executeTaskWithRetry(InstanceStatus instance, Task task) {
+    private ExecuteResponse executeTask(InstanceStatus instance, Task task) {
         String instanceId = instance.getInstanceId();
-        int retryCount = 0;
         // 标记实例为运行中
         instance.setStatus(1);
 
         try {
-            while (retryCount <= maxRetry ) {
-                try {
-                    log.info("开始执行任务,实例:{},重试次数:{}/{},任务内容:{}", 
-                            instanceId, retryCount, maxRetry, task.getFilePath());
-
-                    // 调用解析器执行任务
-                    ExecuteResponse response = callParser(instance, task);
-
-                    if (response != null && response.getCode() == 200) {
-                        log.info("任务执行成功,实例:{},响应:{}", instanceId, response);
-                        return response;
-                    } else {
-                        log.warn("任务执行返回失败,实例:{},响应:{},准备重试", instanceId, response);
-                        retryCount++;
-                    }
-                } catch (Exception e) {
-                    log.error("任务执行异常,实例:{},重试次数:{}/{}", instanceId, retryCount, maxRetry, e);
-                    retryCount++;
-                }
+            log.info("开始执行任务,实例:{},任务内容:{}", instanceId, task.getFilePath());
 
-            }
+            // 调用解析器执行任务
+            ExecuteResponse response = callParser(instance, task);
 
-            return ExecuteResponse.fail("任务执行失败,已达最大重试次数");
+            if (response != null && response.getCode() == 200) {
+                log.info("任务执行成功,实例:{},响应:{}", instanceId, response.getMessage());
+                return response;
+            } else {
+                log.warn("任务执行失败,实例:{},响应:{}", instanceId, response);
+                return ExecuteResponse.fail("任务执行失败");
+            }
+        } catch (Exception e) {
+            log.error("任务执行异常,实例:{}", instanceId, e);
+            return ExecuteResponse.fail("任务执行异常: " + e.getMessage());
         } finally {
             // 恢复实例状态为空闲
             instance.setStatus(0);
+            instance.setLastIdleTime(System.currentTimeMillis());
         }
     }
 
@@ -569,6 +514,7 @@ public class InstanceMonitorService {
                             .memoryUsage(status.getMemoryUsage())
                             .gpuUsage(status.getGpuUsage())
                             .gpuMemory(status.getGpuMemory())
+                            .lastIdleTime(status.getLastIdleTime())
                             .lastHeartbeatTime(status.getLastHeartbeatTime())
                             .build())
                     .collect(Collectors.toList());
@@ -619,5 +565,35 @@ public class InstanceMonitorService {
         }
     }
 
-}
+    /**
+     * 计算平均GPU负载
+     */
+    private double calculateAverageGpuLoad(Map<String, InstanceStatus> activeInstancePool) {
+        return activeInstancePool.values().stream()
+                .filter(status -> status.getGpuUsage() != null)
+                .mapToDouble(InstanceStatus::getGpuUsage)
+                .average()
+                .orElse(0.0);
+    }
 
+    /**
+     * 扩容一个实例
+     */
+    private void ScaleUpOneInstance() {
+        // 使用PortPool分配端口
+        Integer port = portPool.allocatePort();
+        if(port != null){
+            String instanceId = instanceManager.startParseInstance(port);
+            // 增加实例ID空值校验
+            if (instanceId == null || instanceId.isEmpty()) {
+                log.error("扩容实例失败:进程创建失败,端口:{}", port);
+                portPool.releasePort(port); // 归还端口
+                return;
+            }
+            InstanceStatus instanceStatus = saveInstanceStatus(instanceId, port);
+            log.info("扩容,已创建实例,实例ID:{},端口:{}", instanceId, port);
+        } else {
+            log.warn("端口池已满,无法扩容");
+        }
+    }
+}

+ 2 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/service/TaskRecordService.java

@@ -9,4 +9,6 @@ public interface TaskRecordService extends IService<TaskRecordEntity> {
     TaskRecordEntity createTaskRecord(TaskRecordRequest taskRecordRequest);
 
     void updateStatus(TaskRecordRequest taskRecordRequest);
+
+    TaskRecordEntity getTaskRecordByTaskId(String taskId);
 }

+ 7 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/service/impl/TaskRecordServiceImpl.java

@@ -40,4 +40,11 @@ public class TaskRecordServiceImpl extends ServiceImpl<TaskRecordMapper, TaskRec
             throw new RuntimeException("Task not found: " + taskRecordRequest.getTaskId());
         }
     }
+
+    @Override
+    public TaskRecordEntity getTaskRecordByTaskId(String taskId) {
+        QueryWrapper<TaskRecordEntity> wrapper = new QueryWrapper<>();
+        wrapper.eq("task_id", taskId);
+        return getOne(wrapper);
+    }
 }

+ 8 - 8
schedule-manager/src/main/resources/application.yml

@@ -1,5 +1,5 @@
 server:
-  port: 8083
+  port: 1083
 
 spring:
   application:
@@ -22,10 +22,15 @@ parser:
     python-path: parser/venv/bin/python
     # 工作目录
     work-dir: .
-    image: parse-service:latest
+    image: parse-instance:amd64
     #实例启动方式
     type: docker
 
+    #实例启动端口池
+    port:
+      start: 1030
+      end: 1050
+
 
   
   # 监控参数配置
@@ -37,7 +42,7 @@ parser:
     min-active-instance: 3
 
     # 最大活跃实例数,资源上限
-    max-active-instance: 4
+    max-active-instance: 6
 
     # 任务积压阈值,触发临时扩容的 Kafka Lag 阈值 (> 100)
     task-backlog-threshold: 100
@@ -54,9 +59,4 @@ parser:
     #GPU负载扩容实例数
     gpu-scale-instance-num: 1
 
-  # Java 管理器配置
-  manager:
-    # Java 管理端的访问地址(用于 Python 实例注册回调)
-    url: http://localhost:8080
-
 

+ 22 - 0
schedule-producer/pom.xml

@@ -63,6 +63,28 @@
             <artifactId>junit</artifactId>
             <scope>test</scope>
         </dependency>
+        <!-- SpringBoot测试核心依赖 -->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <!-- Junit5(SpringBoot 2.4+默认集成) -->
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-engine</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka-test</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>

+ 1 - 1
schedule-producer/src/main/resources/application.yml

@@ -1,5 +1,5 @@
 server:
-  port: 8081
+  port: 1081
 
 spring:
   application:

+ 30 - 0
schedule-producer/src/test/java/cn/com/yusys/producer/service/MessageSenderTest.java

@@ -0,0 +1,30 @@
+package cn.com.yusys.producer.service;
+
+import cn.com.yusys.producer.ProducerApplication;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+
+@SpringBootTest(classes = ProducerApplication.class)
+@RunWith(SpringJUnit4ClassRunner.class)
+public class MessageSenderTest {
+
+
+    @Autowired
+    private MessageSender messageSender;
+
+
+    @Test
+    public void testSend() {
+        String []topics = new String[]{"task_post_loan_medium","task_voucher_medium","task_contract_high"};
+
+        for (String topic : topics) {
+            for (int i = 0; i < 20; i++) {
+                messageSender.send(topic, null, "./examples/test3.png");
+            }
+        }
+    }
+}

+ 1 - 1
schedule-producer/src/test/java/util/FileTypeDetectorTest.java → schedule-producer/src/test/java/cn/com/yusys/producer/service/util/FileTypeDetectorTest.java

@@ -1,4 +1,4 @@
-package util;
+package cn.com.yusys.producer.service.util;
 
 import cn.com.yusys.producer.util.FileMetadataUtil;
 import cn.com.yusys.producer.util.FileTypeDetector;