1
0

3 کامیت‌ها 893784917c ... dd9f17c205

نویسنده SHA1 پیام تاریخ
  zsh dd9f17c205 增加日志和状态查询接口 3 هفته پیش
  zsh c083ce536e 加入解析器代码至仓库 3 هفته پیش
  zsh 4a3994c9ef 增加模块之间调用的接口 3 هفته پیش
38فایلهای تغییر یافته به همراه2821 افزوده شده و 125 حذف شده
  1. 2 1
      .gitignore
  2. 15 3
      README.md
  3. 55 0
      parser/.dockerignore
  4. 342 0
      parser/core/router.py
  5. 38 26
      parser/dockerfile
  6. BIN
      parser/examples/test1.pdf
  7. 25 0
      parser/models/result.py
  8. 26 79
      parser/parse_service.py
  9. 137 0
      parser/parsers/audio_parser.py
  10. 282 0
      parser/parsers/native_parser.py
  11. 43 0
      parser/parsers/text_parser.py
  12. 151 0
      parser/parsers/video_parser.py
  13. 336 0
      parser/parsers/visual_parser.py
  14. 25 2
      parser/requirements.txt
  15. 165 0
      parser/utils/ffmpeg_wrapper.py
  16. 47 0
      parser/utils/logger.py
  17. 51 0
      parser/utils/mime_detector.py
  18. 80 0
      parser/utils/stability.py
  19. 5 0
      schedule-consumer/pom.xml
  20. 1 1
      schedule-consumer/src/main/java/cn/com/yusys/consumer/config/KafkaConsumerConfig.java
  21. 42 6
      schedule-consumer/src/main/java/cn/com/yusys/consumer/listener/MessageListener.java
  22. 20 0
      schedule-consumer/src/main/java/cn/com/yusys/consumer/model/Task.java
  23. 88 0
      schedule-consumer/src/main/java/cn/com/yusys/consumer/util/ParseServiceClient.java
  24. 24 0
      schedule-consumer/src/main/java/cn/com/yusys/consumer/util/response/ExecuteResponse.java
  25. 40 0
      schedule-consumer/src/main/java/cn/com/yusys/consumer/util/response/InstanceStatusResponse.java
  26. 4 1
      schedule-consumer/src/main/resources/application.yml
  27. 5 0
      schedule-manager/pom.xml
  28. 66 0
      schedule-manager/src/main/java/cn/com/yusys/manager/controller/ManagerController.java
  29. 32 0
      schedule-manager/src/main/java/cn/com/yusys/manager/model/ExecuteResponse.java
  30. 32 0
      schedule-manager/src/main/java/cn/com/yusys/manager/model/InstanceConfigRequest.java
  31. 118 0
      schedule-manager/src/main/java/cn/com/yusys/manager/model/InstanceManagementResponse.java
  32. 22 0
      schedule-manager/src/main/java/cn/com/yusys/manager/model/Task.java
  33. 60 0
      schedule-manager/src/main/java/cn/com/yusys/manager/model/TaskLogResponse.java
  34. 208 4
      schedule-manager/src/main/java/cn/com/yusys/manager/service/InstanceMonitorService.java
  35. 177 0
      schedule-manager/src/main/java/cn/com/yusys/manager/service/TaskLogService.java
  36. 53 0
      schedule-manager/src/main/java/cn/com/yusys/manager/util/ParseInstanceClient.java
  37. 3 1
      schedule-manager/src/main/resources/application.yml
  38. 1 1
      schedule-producer/src/main/resources/application.yml

+ 2 - 1
.gitignore

@@ -5,7 +5,8 @@ target/
 !**/src/main/**/target/
 !**/src/test/**/target/
 .kotlin
-
+**.log
+logs/
 ### IntelliJ IDEA ###
 .idea/modules.xml
 .idea/jarRepositories.xml

+ 15 - 3
README.md

@@ -125,15 +125,27 @@ java -jar schedule-consumer/target/schedule-consumer.jar
 # 启动Manager
 java -jar schedule-manager/target/schedule-manager.jar
 ```
+解析服务镜像打包:
+```bash
+docker build -t parse-service:latest  .
+```
 
-## 使用指南
+## 测试指南
 
 ### 发送任务
 
 通过Producer模块提供的RESTful API发送任务:
 
-```bash
-curl -X POST http://localhost:8081/api/task   -H "Content-Type: application/json"   -d '{"taskData": "your task data"}'
+```
+POST http://localhost:8081/api/send
+
+测试数据
+{
+    "topic":"schedule-topic",
+    "key":"1",
+    "message":"./examples/test1.pdf"
+}
+
 ```
 
 ### 监控任务

+ 55 - 0
parser/.dockerignore

@@ -0,0 +1,55 @@
+# Git相关
+.git
+.gitignore
+.gitattributes
+
+# Python相关
+__pycache__
+*.pyc
+*.pyo
+*.pyd
+.Python
+*.egg-info/
+dist/
+build/
+*.egg
+.pytest_cache/
+.coverage
+htmlcov/
+
+# 虚拟环境
+venv/
+env/
+ENV/
+.venv
+
+# IDE相关
+.vscode/
+.idea/
+*.swp
+*.swo
+*~
+.DS_Store
+
+# 输出文件
+output/
+logs/
+*.log
+
+# 临时文件
+tmp/
+temp/
+*.tmp
+
+# 文档
+*.md
+doc/
+docs/
+
+# 开发工具
+.editorconfig
+.pre-commit-config.yaml
+Makefile
+
+# 不需要的目录
+.github/

+ 342 - 0
parser/core/router.py

@@ -0,0 +1,342 @@
+from typing import Optional, Type
+from abc import ABC, abstractmethod
+import fitz  # PyMuPDF
+from utils.mime_detector import MimeDetector
+from utils.logger import log
+from models.result import ParseResult
+
+
+class Parser(ABC):
+    """解析器基类"""
+    
+    @abstractmethod
+    async def parse(self, file_path: str) -> ParseResult:
+        """
+        解析文件
+        
+        Args:
+            file_path: 文件路径
+            
+        Returns:
+            ParseResult: 解析结果
+        """
+        pass
+    
+    def _get_file_size(self, file_path: str) -> int:
+        """
+        获取文件大小
+        
+        Args:
+            file_path: 文件路径
+            
+        Returns:
+            int: 文件大小(字节)
+        """
+        import os
+        try:
+            return os.path.getsize(file_path)
+        except Exception:
+            return 0
+
+
+class ParserFactory:
+    """解析器工厂类"""
+    
+    def __init__(self):
+        self.mime_detector = MimeDetector()
+        self.parsers = {}
+        self.parser_instances = {}  # 缓存解析器实例
+        # 统计信息
+        self.stats = {
+            'total_files': 0,
+            'total_size': 0,
+            'text_files': 0,
+            'text_size': 0,
+            'image_files': 0,
+            'image_size': 0,
+            'audio_files': 0,
+            'audio_size': 0,
+            'video_files': 0,
+            'video_size': 0,
+            'pdf_files': 0,
+            'pdf_size': 0,
+            'office_files': 0,
+            'office_size': 0,
+            'total_time': 0,
+            'successful_files': 0,
+            'failed_files': 0
+        }
+    
+    def register_parser(self, mime_type: str, parser_class: Type[Parser]):
+        """
+        注册解析器
+        
+        Args:
+            mime_type: MIME类型
+            parser_class: 解析器类
+        """
+        self.parsers[mime_type] = parser_class
+    
+    async def get_parser(self, file_path: str) -> Parser:
+        """
+        根据文件类型和内容特征获取合适的解析器
+        
+        Args:
+            file_path: 文件路径
+            
+        Returns:
+            Parser: 解析器实例
+        """
+        log.info(f"开始获取解析器,文件路径: {file_path}")
+        # 1. 检测文件MIME类型
+        mime_type = self.mime_detector.detect(file_path)
+        log.info(f"文件MIME类型: {mime_type}")
+        
+        # 2. 第一层路由:根据MIME类型分流
+        if mime_type.startswith("text/"):
+            log.info("检测到文本文件,使用TextParser")
+            if "TextParser" not in self.parser_instances:
+                from parsers.text_parser import TextParser
+                self.parser_instances["TextParser"] = TextParser()
+            return self.parser_instances["TextParser"]
+        elif mime_type.startswith("image/"):
+            log.info("检测到图片文件,使用VisualDocParser")
+            if "VisualDocParser" not in self.parser_instances:
+                from parsers.visual_parser import VisualDocParser
+                self.parser_instances["VisualDocParser"] = VisualDocParser()
+            return self.parser_instances["VisualDocParser"]
+        elif mime_type.startswith("audio/"):
+            log.info("检测到音频文件,使用AudioParser")
+            if "AudioParser" not in self.parser_instances:
+                from parsers.audio_parser import AudioParser
+                self.parser_instances["AudioParser"] = AudioParser()
+            return self.parser_instances["AudioParser"]
+        elif mime_type.startswith("video/"):
+            log.info("检测到视频文件,使用VideoParser")
+            if "VideoParser" not in self.parser_instances:
+                from parsers.video_parser import VideoParser
+                self.parser_instances["VideoParser"] = VideoParser()
+            return self.parser_instances["VideoParser"]
+        elif mime_type == "application/pdf":
+            # 3. 第二层路由:PDF特殊处理
+            log.info("检测到PDF文件,进入特殊路由")
+            return await self._route_pdf(file_path)
+        elif "openxmlformats" in mime_type or mime_type == "application/msword":
+            # Office文件处理(包括docx和doc)
+            log.info(f"检测到Office文件,MIME类型: {mime_type},使用NativeDocParser")
+            return await self._route_office(file_path, mime_type)
+        else:
+            log.error(f"不支持的文件类型: {mime_type}")
+            raise Exception(f"不支持的文件类型: {mime_type}")
+    
+    async def _route_pdf(self, file_path: str) -> Parser:
+        """
+        PDF文件路由逻辑
+        
+        Args:
+            file_path: 文件路径
+            
+        Returns:
+            Parser: 解析器实例
+        """
+        # 检测PDF是否为扫描件(文本密度检测)
+        if self._is_scanned_pdf(file_path):
+            log.info("PDF为扫描件,使用VisualDocParser")
+            if "VisualDocParser" not in self.parser_instances:
+                from parsers.visual_parser import VisualDocParser
+                self.parser_instances["VisualDocParser"] = VisualDocParser()
+            return self.parser_instances["VisualDocParser"]
+        else:
+            log.info("PDF为原生文档,使用NativeDocParser")
+            if "NativeDocParser" not in self.parser_instances:
+                from parsers.native_parser import NativeDocParser
+                self.parser_instances["NativeDocParser"] = NativeDocParser()
+            return self.parser_instances["NativeDocParser"]
+    
+    async def _route_office(self, file_path: str, mime_type: str) -> Parser:
+        """
+        Office文件路由逻辑
+        
+        Args:
+            file_path: 文件路径
+            mime_type: MIME类型
+            
+        Returns:
+            Parser: 解析器实例
+        """
+        if "NativeDocParser" not in self.parser_instances:
+            from parsers.native_parser import NativeDocParser
+            self.parser_instances["NativeDocParser"] = NativeDocParser()
+        return self.parser_instances["NativeDocParser"]
+    
+    def _is_scanned_pdf(self, file_path: str) -> bool:
+        """
+        检测PDF是否为扫描件
+        
+        Args:
+            file_path: 文件路径
+            
+        Returns:
+            bool: 是否为扫描件
+        """
+        try:
+            doc = fitz.open(file_path)
+            text_content = ""
+            # 提取前3页文本
+            for page_num in range(min(3, len(doc))):
+                page = doc[page_num]
+                text_content += page.get_text()
+            doc.close()
+            
+            # 计算有效字符数
+            valid_chars = len([c for c in text_content if c.isalnum() or c.isspace()])
+            log.info(f"PDF前3页有效字符数: {valid_chars}")
+            
+            # 如果有效字符数少于50,认为是扫描件
+            return valid_chars < 50
+        except Exception as e:
+            log.error(f"PDF文本提取失败: {str(e)}")
+            # 提取失败时默认使用VisualDocParser
+            return True
+    
+    async def parse(self, file_path: str) -> ParseResult:
+        """
+        解析文件的入口方法
+        
+        Args:
+            file_path: 文件路径
+            
+        Returns:
+            ParseResult: 解析结果
+        """
+        import time
+        import os
+        
+        start_time = time.time()
+        file_size = 0
+        
+        try:
+            file_size = os.path.getsize(file_path)
+        except Exception:
+            pass
+        
+        log.info(f"开始解析文件: {file_path}, 文件大小: {file_size / (1024 * 1024):.2f} MB")
+        
+        try:
+            parser = await self.get_parser(file_path)
+            log.info(f"获取到解析器: {parser.__class__.__name__}")
+            
+            result = await parser.parse(file_path)
+            
+            end_time = time.time()
+            elapsed_time = end_time - start_time
+            
+            # 更新统计信息
+            self.stats['total_files'] += 1
+            self.stats['total_size'] += file_size
+            self.stats['total_time'] += elapsed_time
+            self.stats['successful_files'] += 1
+            
+            # 根据文件类型更新统计
+            file_type = result.file_type
+            if file_type.startswith('text'):
+                self.stats['text_files'] += 1
+                self.stats['text_size'] += file_size
+            elif file_type.startswith('image') or file_type == 'visual':
+                self.stats['image_files'] += 1
+                self.stats['image_size'] += file_size
+            elif file_type.startswith('audio'):
+                self.stats['audio_files'] += 1
+                self.stats['audio_size'] += file_size
+            elif file_type.startswith('video'):
+                self.stats['video_files'] += 1
+                self.stats['video_size'] += file_size
+            elif file_type == 'pdf' or file_type == 'pdf_scanned':
+                self.stats['pdf_files'] += 1
+                self.stats['pdf_size'] += file_size
+            elif file_type == 'office':
+                self.stats['office_files'] += 1
+                self.stats['office_size'] += file_size
+            
+            # 解析结果日志
+            log.info(f"文件解析完成,耗时: {elapsed_time:.2f} 秒")
+            log.info(f"文件类型: {result.file_type}")
+            log.info(f"解析内容长度: {len(result.content)} 字符")
+            log.info(f"元数据: {result.metadata}")
+            if result.tables:
+                log.info(f"提取到表格数量: {len(result.tables)}")
+            
+            return result
+        except Exception as e:
+            end_time = time.time()
+            elapsed_time = end_time - start_time
+            
+            # 更新统计信息
+            self.stats['total_files'] += 1
+            self.stats['total_size'] += file_size
+            self.stats['total_time'] += elapsed_time
+            self.stats['failed_files'] += 1
+            
+            log.error(f"解析失败: {str(e)}, 耗时: {elapsed_time:.2f} 秒")
+            # 返回错误结果
+            return ParseResult(
+                content=f"解析失败: {str(e)}",
+                metadata={"error": str(e)},
+                file_type="error"
+            )
+    
+    def generate_performance_report(self) -> str:
+        """
+        生成性能报告
+        
+        Returns:
+            str: 性能报告
+        """
+        stats = self.stats
+        
+        # 计算各项指标
+        total_files = stats['total_files']
+        total_size = stats['total_size']
+        total_time = stats['total_time']
+        successful_files = stats['successful_files']
+        failed_files = stats['failed_files']
+        
+        # 计算各类文件占比
+        text_ratio = (stats['text_size'] / total_size * 100) if total_size > 0 else 0
+        image_ratio = (stats['image_size'] / total_size * 100) if total_size > 0 else 0
+        audio_ratio = (stats['audio_size'] / total_size * 100) if total_size > 0 else 0
+        video_ratio = (stats['video_size'] / total_size * 100) if total_size > 0 else 0
+        pdf_ratio = (stats['pdf_size'] / total_size * 100) if total_size > 0 else 0
+        office_ratio = (stats['office_size'] / total_size * 100) if total_size > 0 else 0
+        
+        # 计算解析速度
+        total_size_mb = total_size / (1024 * 1024)
+        avg_speed = (total_size_mb / total_time) if total_time > 0 else 0
+        
+        # 生成报告
+        report = f"""# 解析性能报告
+
+## 总体情况
+- 总解析文件数: {total_files}
+- 成功解析: {successful_files}
+- 解析失败: {failed_files}
+- 总文件大小: {total_size_mb:.2f} MB
+- 总耗时: {total_time:.2f} 秒
+- 平均解析速度: {avg_speed:.2f} MB/秒
+
+## 文件类型分布
+- 文本文件: {stats['text_files']} 个, {stats['text_size'] / (1024 * 1024):.2f} MB, 占比: {text_ratio:.2f}%
+- 图片文件: {stats['image_files']} 个, {stats['image_size'] / (1024 * 1024):.2f} MB, 占比: {image_ratio:.2f}%
+- 音频文件: {stats['audio_files']} 个, {stats['audio_size'] / (1024 * 1024):.2f} MB, 占比: {audio_ratio:.2f}%
+- 视频文件: {stats['video_files']} 个, {stats['video_size'] / (1024 * 1024):.2f} MB, 占比: {video_ratio:.2f}%
+- PDF文件: {stats['pdf_files']} 个, {stats['pdf_size'] / (1024 * 1024):.2f} MB, 占比: {pdf_ratio:.2f}%
+- Office文件: {stats['office_files']} 个, {stats['office_size'] / (1024 * 1024):.2f} MB, 占比: {office_ratio:.2f}%
+
+## 性能分析
+- 文本类平均解析速度: {(stats['text_size'] / (1024 * 1024) / total_time):.2f} MB/秒 (如果有文本文件)
+- 图片类平均解析速度: {(stats['image_size'] / (1024 * 1024) / total_time):.2f} MB/秒 (如果有图片文件)
+- 音频类平均解析速度: {(stats['audio_size'] / (1024 * 1024) / total_time):.2f} MB/秒 (如果有音频文件)
+- 视频类平均解析速度: {(stats['video_size'] / (1024 * 1024) / total_time):.2f} MB/秒 (如果有视频文件)
+"""
+        
+        return report

+ 38 - 26
parser/dockerfile

@@ -1,34 +1,46 @@
-# 基础镜像:使用Python 3.12-alpine
-FROM python:3.12-alpine
+# 基础镜像:使用Python官方镜像(更稳定)
+FROM python:3.11-slim
 
 # 设置工作目录
 WORKDIR /app
 
-
+# 环境变量
+ENV PYTHONIOENCODING=utf-8 \
+    PIP_DEFAULT_TIMEOUT=120 \
+    PIP_DISABLE_PIP_VERSION_CHECK=1 \
+    DEBIAN_FRONTEND=noninteractive \
+    PYTHONUNBUFFERED=1
+
+# ========== 系统依赖安装 ==========
+RUN sed -i 's|deb.debian.org|mirrors.ustc.edu.cn|g' /etc/apt/sources.list.d/debian.sources && \
+    apt-get update && \
+    apt-get install -y --no-install-recommends \
+    libglib2.0-0 \
+    libsm6 \
+    libxext6 \
+    libxrender-dev \
+    libgomp1 \
+    libgthread-2.0-0 \
+    libgtk-3-0 \
+    libgstreamer1.0-0 \
+    libgstreamer-plugins-base1.0-0 \
+    ffmpeg \
+    ca-certificates \
+    && rm -rf /var/lib/apt/lists/*
+
+# ========== 配置pip源(国内加速) ==========
+RUN pip config set global.index-url https://mirrors.aliyun.com/pypi/simple/ && \
+    pip config set global.trusted-host mirrors.aliyun.com
+
+# ========== 安装Python依赖(包括所有系统库的Python包装) ==========
 COPY requirements.txt .
+RUN pip install --no-cache-dir -r requirements.txt
+
+# ========== 复制应用代码 ==========
+COPY . .
 
-# 设置环境变量
-ENV PYTHONUNBUFFERED=1 \
-    PYTHONDONTWRITEBYTECODE=1 \
-    PIP_NO_CACHE_DIR=off \
-    PIP_DISABLE_PIP_VERSION_CHECK=on
-
-# 安装编译依赖并编译安装所有需要的包
-RUN apk add --no-cache \
-    gcc \
-    musl-dev \
-    linux-headers \
-    && pip install -i https://mirrors.aliyun.com/pypi/simple/ psutil==5.9.5 \
-    # 注意:在这里安装所有需要编译的依赖
-    && pip install -i https://mirrors.aliyun.com/pypi/simple/ -r requirements.txt \
-    # 最后清理编译工具
-    && apk del gcc musl-dev linux-headers
-
-# 复制服务代码
-COPY parse_service.py .
-
-# 暴露服务端口
+# ========== 暴露端口 ==========
 EXPOSE 8000
 
-# 启动命令
-CMD ["python", "parse_service.py", "--host", "0.0.0.0", "--port", "8000"]
+# ========== 启动应用 ==========
+CMD ["python", "-m", "uvicorn", "parse_service:app", "--host", "0.0.0.0", "--port", "8000"]

BIN
parser/examples/test1.pdf


+ 25 - 0
parser/models/result.py

@@ -0,0 +1,25 @@
+from dataclasses import dataclass, field
+from typing import Dict, List, Optional
+
+
+@dataclass
+class ParseResult:
+    """解析结果的统一输出结构"""
+    content: str = ""  # 解析出的 Markdown 文本
+    metadata: Dict[str, any] = field(default_factory=dict)  # 页数、作者、时长等元数据
+    file_type: str = ""  # 识别出的具体类型
+    tables: List[Dict] = field(default_factory=list)  # 提取出的结构化表格数据
+
+    def to_dict(self) -> Dict[str, any]:
+        """转换为字典格式"""
+        return {
+            "content": self.content,
+            "metadata": self.metadata,
+            "file_type": self.file_type,
+            "tables": self.tables
+        }
+
+    def to_json(self) -> str:
+        """转换为JSON字符串"""
+        import json
+        return json.dumps(self.to_dict(), ensure_ascii=False, indent=2)

+ 26 - 79
parser/parse_service.py

@@ -3,10 +3,11 @@ import time
 import threading
 import argparse
 from fastapi import FastAPI, BackgroundTasks
-from pydantic import BaseModel
+from pydantic import BaseModel, Field
 from typing import Dict, Optional, List
-import psutil
 import os
+from core.router import ParserFactory
+
 
 # 初始化FastAPI应用
 app = FastAPI(title="Python解析服务", version="1.0")
@@ -22,114 +23,60 @@ service_status = {
 # 锁机制,保证多线程安全
 task_lock = threading.Lock()
 
-# 任务请求模型
-class ParseTask(BaseModel):
-    task_id: str
-    file_path: str
-    parse_params: Optional[Dict] = None  # 解析参数,如解析类型、阈值等
 
 # ------------------------------
-# 核心解析逻辑(模拟真实解析器调用)
+# 请求模型定义
 # ------------------------------
-def parse_task_worker(task_id: str, file_path: str, parse_params: Dict):
-    """
-    后台解析任务执行函数
-    """
-    try:
-        with task_lock:
-            task_status[task_id] = {
-                "status": "running",
-                "progress": 0,
-                "result": None,
-                "error": None
-            }
-        
-        # 模拟解析过程(实际场景替换为真实解析器调用)
-        total_steps = 10
-        for step in range(total_steps):
-            time.sleep(1)  # 模拟解析耗时
-            progress = (step + 1) * 10
-            with task_lock:
-                task_status[task_id]["progress"] = progress
-        
-        # 解析完成,模拟返回结果
-        with task_lock:
-            task_status[task_id]["status"] = "completed"
-            task_status[task_id]["result"] = {
-                "file_path": file_path,
-                "parse_result": "解析成功(模拟结果)",
-                "parse_params": parse_params,
-                "finish_time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
-            }
-    
-    except Exception as e:
-        with task_lock:
-            task_status[task_id]["status"] = "failed"
-            task_status[task_id]["error"] = str(e)
+class ExecuteRequest(BaseModel):
+    file_path: str=Field(alias="filePath")
+    task_id:str=Field(alias="taskId")
 
 # ------------------------------
 # 接口定义
 # ------------------------------
 @app.post("/execute", summary="接收解析任务并执行")
-async def execute_task(task: ParseTask, background_tasks: BackgroundTasks):
+async def execute_task(request: ExecuteRequest):
     """
     接收Java端下发的解析任务,后台异步执行
     """
-    # 检查任务ID是否已存在
-    if task.task_id in task_status:
-        return {
-            "code": 400,
-            "msg": f"任务ID {task.task_id} 已存在",
-            "data": None
-        }
+
+    factory = ParserFactory()
     
-    # 提交后台任务执行
-    background_tasks.add_task(parse_task_worker, task.task_id, task.file_path, task.parse_params)
+    # 解析文件
+    result = await factory.parse(request.file_path)
     
-    return {
-        "code": 200,
-        "msg": "任务接收成功,已开始执行",
-        "data": {"task_id": task.task_id}
-    }
-
-@app.get("/status/{task_id}", summary="查询指定任务状态")
-async def get_task_status(task_id: str):
-    """
-    查询单个任务的执行状态、进度、结果
-    """
-    if task_id not in task_status:
-        return {
-            "code": 404,
-            "msg": f"任务ID {task_id} 不存在",
-            "data": None
-        }
     
+    print(result.content)
+    
+    # 生成并显示性能报告
+    report = factory.generate_performance_report()
+    print("\n" + "="*80)
+    print(report)
+    print("="*80)
+    
+
     return {
         "code": 200,
-        "msg": "查询成功",
-        "data": task_status[task_id]
+        "msg": "任务执行成功",
+        "data": result.content
     }
 
-
 @app.get("/status", summary="状态接口")
 async def health_check():
     """
     返回实例健康状态、资源使用情况
     """
-    cpu_usage = psutil.cpu_percent(interval=1)
     
-    # 内存信息
-    memory = psutil.virtual_memory()
-    memory_usage = memory.percent
+    
     
     return {
     "code": 200,
     "msg": "success",
     "data": {
         "status": 0,
-        "cpu_usage": cpu_usage,
+        "cpu_usage": 0.2,
         "gpu_usage": 0.0,
-        "memory_usage": memory_usage,
+        "memory_usage": 0.2,
         "gpu_memory": 0.0
     }
 }

+ 137 - 0
parser/parsers/audio_parser.py

@@ -0,0 +1,137 @@
+from core.router import Parser
+from models.result import ParseResult
+from utils.logger import log
+from utils.ffmpeg_wrapper import FFmpegWrapper
+import os
+import tempfile
+import requests
+
+
+class AudioParser(Parser):
+    """音频文件解析器"""
+    
+    def __init__(self):
+        self.ffmpeg = FFmpegWrapper()
+        # Qwen3-ASR模型配置 - 使用专门的音频转录端点
+        self.qwen_asr_api_url = "http://10.192.72.13:7283/v1/audio/transcriptions"
+        log.info("音频解析器初始化完成,使用本地部署的Qwen3-ASR模型")
+    
+    async def parse(self, file_path: str) -> ParseResult:
+        """
+        解析音频文件
+        
+        Args:
+            file_path: 文件路径
+            
+        Returns:
+            ParseResult: 解析结果
+        """
+        log.info(f"开始解析音频文件: {file_path}")
+        temp_wav_path = None
+        try:
+            # 1. 预处理:转换为16k/16bit/mono wav
+            with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as temp_file:
+                temp_wav_path = temp_file.name
+            
+            log.info(f"创建临时文件: {temp_wav_path}")
+            self.ffmpeg.convert_audio(file_path, temp_wav_path)
+            log.info(f"音频转换完成: {temp_wav_path}")
+            
+            # 2. 使用Qwen3-ASR进行语音识别
+            log.info("开始使用Qwen3-ASR进行语音识别...")
+            log.info(f"使用的API地址: {self.qwen_asr_api_url}")
+            
+            content = []
+            result = None
+            try:
+                # 检查文件大小
+                file_size = os.path.getsize(temp_wav_path)
+                log.info(f"音频文件大小: {file_size / (1024 * 1024):.2f} MB")
+                
+                # 使用专门的音频转录端点,支持文件上传
+                log.info("准备使用文件上传方式进行语音识别...")
+                
+                # 构建请求数据
+                session = requests.Session()
+                session.trust_env = False  # 禁用环境变量中的代理设置
+                
+                # 使用multipart/form-data上传文件
+                files = {
+                    'file': ('audio.wav', open(temp_wav_path, 'rb'), 'audio/wav')
+                }
+                data = {
+                    'model': '/data/shared/Qwen3-ASR/qwen/Qwen3-ASR-1.7B',
+                    'language': 'zh',
+                    'response_format': 'json'
+                }
+                
+                # 发送请求
+                log.info("开始发送请求...")
+                # 增加超时时间,音频处理可能需要更长时间
+                response = session.post(self.qwen_asr_api_url, files=files, data=data, timeout=600)
+                log.info(f"请求完成,状态码: {response.status_code}")
+                
+                # 打印响应内容以进行调试
+                if response.status_code != 200:
+                    log.warning(f"响应内容: {response.text}")
+                
+                response.raise_for_status()
+                result = response.json()
+                
+                log.info("Qwen3-ASR语音识别完成")
+                log.info(f"识别结果: {result}")
+                
+                # 3. 构建解析结果
+                if result and 'text' in result:
+                    full_text = result['text']
+                    # 清理识别结果中的标记
+                    clean_text = full_text.replace('language Chinese<asr_text>', '').strip()
+                    content.append(f"完整文本: {clean_text}")
+                else:
+                    log.warning("解析失败:未获取到有效结果")
+                    content.append("解析失败:未获取到有效结果")
+                
+                log.info(f"构建完成,内容长度: {len(content)}")
+            except requests.exceptions.Timeout as e:
+                log.error(f"Qwen3-ASR语音识别超时: {str(e)}")
+                content.append("语音识别超时,请检查服务是否正常运行")
+            except requests.exceptions.ConnectionError as e:
+                log.error(f"Qwen3-ASR语音识别连接错误: {str(e)}")
+                content.append("语音识别连接错误,请检查服务地址是否正确")
+            except Exception as e:
+                log.error(f"Qwen3-ASR语音识别失败: {str(e)}")
+                import traceback
+                log.error(f"异常堆栈: {traceback.format_exc()}")
+                # 即使失败,也尝试返回一个基本结果
+                content.append("语音识别失败,但文件已成功处理")
+            
+            # 清理临时文件
+            if temp_wav_path and os.path.exists(temp_wav_path):
+                os.remove(temp_wav_path)
+                log.info(f"临时文件已清理: {temp_wav_path}")
+            
+            return ParseResult(
+                content="\n".join(content),
+                metadata={
+                    "parser": "Qwen3-ASR",
+                    "file_size": os.path.getsize(file_path),
+                    "api_url": self.qwen_asr_api_url
+                },
+                file_type="audio"
+            )
+        except Exception as e:
+            log.error(f"音频文件解析失败: {str(e)}")
+            import traceback
+            log.error(f"异常堆栈: {traceback.format_exc()}")
+            # 清理临时文件
+            if temp_wav_path and os.path.exists(temp_wav_path):
+                try:
+                    os.remove(temp_wav_path)
+                    log.info(f"临时文件已清理: {temp_wav_path}")
+                except:
+                    pass
+            return ParseResult(
+                content="",
+                metadata={"error": str(e)},
+                file_type="audio"
+            )

+ 282 - 0
parser/parsers/native_parser.py

@@ -0,0 +1,282 @@
+from core.router import Parser
+from models.result import ParseResult
+from utils.logger import log
+import fitz  # PyMuPDF
+from docx import Document
+import openpyxl
+from pptx import Presentation
+import os
+
+
+class NativeDocParser(Parser):
+    """原生文档解析器,处理Office文档和原生PDF"""
+    
+    async def parse(self, file_path: str) -> ParseResult:
+        """
+        解析原生文档
+        
+        Args:
+            file_path: 文件路径
+            
+        Returns:
+            ParseResult: 解析结果
+        """
+        log.info(f"开始解析原生文档: {file_path}")
+        try:
+            # 根据文件扩展名判断文件类型
+            ext = os.path.splitext(file_path)[1].lower()
+            
+            if ext == '.pdf':
+                return await self._parse_pdf(file_path)
+            elif ext == '.docx':
+                return await self._parse_docx(file_path)
+            elif ext == '.doc':
+                return await self._parse_doc(file_path)
+            elif ext == '.xlsx':
+                return await self._parse_xlsx(file_path)
+            elif ext == '.pptx':
+                return await self._parse_pptx(file_path)
+            else:
+                raise Exception(f"不支持的文件扩展名: {ext}")
+        except Exception as e:
+            log.error(f"原生文档解析失败: {str(e)}")
+            return ParseResult(
+                content="",
+                metadata={"error": str(e)},
+                file_type="unknown"
+            )
+    
+    async def _parse_pdf(self, file_path: str) -> ParseResult:
+        """
+        解析PDF文件
+        
+        Args:
+            file_path: 文件路径
+            
+        Returns:
+            ParseResult: 解析结果
+        """
+        doc = fitz.open(file_path)
+        content = []
+        tables = []
+        page_count = len(doc)
+        
+        # 遍历所有页面
+        for page_num in range(page_count):
+            page = doc[page_num]
+            # 提取文本
+            text = page.get_text()
+            content.append(f"# 第{page_num + 1}页\n{text}")
+            
+            # 提取表格(PyMuPDF的表格提取功能有限)
+            # 这里可以根据需要使用更高级的表格提取库
+        
+        doc.close()
+        
+        return ParseResult(
+            content="\n\n".join(content),
+            metadata={
+                "page_count": page_count,
+                "file_size": os.path.getsize(file_path)
+            },
+            file_type="pdf",
+            tables=tables
+        )
+    
+    async def _parse_docx(self, file_path: str) -> ParseResult:
+        """
+        解析Word文档
+        
+        Args:
+            file_path: 文件路径
+            
+        Returns:
+            ParseResult: 解析结果
+        """
+        doc = Document(file_path)
+        content = []
+        tables = []
+        
+        # 提取标题和正文
+        for para in doc.paragraphs:
+            if para.style.name.startswith('Heading'):
+                # 根据标题级别添加Markdown标题
+                level = int(para.style.name.split(' ')[1])
+                content.append(f"{'#' * level} {para.text}")
+            else:
+                content.append(para.text)
+        
+        # 提取表格
+        for table_idx, table in enumerate(doc.tables):
+            table_content = []
+            table_data = []
+            
+            # 提取表头
+            header_cells = table.rows[0].cells
+            header = [cell.text.strip() for cell in header_cells]
+            table_content.append('| ' + ' | '.join(header) + ' |')
+            table_content.append('| ' + ' | '.join(['---'] * len(header)) + ' |')
+            table_data.append(header)
+            
+            # 提取表格内容
+            for row in table.rows[1:]:
+                cells = row.cells
+                row_data = [cell.text.strip() for cell in cells]
+                table_content.append('| ' + ' | '.join(row_data) + ' |')
+                table_data.append(row_data)
+            
+            content.append('\n'.join(table_content))
+            tables.append({
+                "table_id": table_idx,
+                "data": table_data
+            })
+        
+        return ParseResult(
+            content="\n".join(content),
+            metadata={
+                "paragraph_count": len(doc.paragraphs),
+                "table_count": len(doc.tables),
+                "file_size": os.path.getsize(file_path)
+            },
+            file_type="docx",
+            tables=tables
+        )
+    
+    async def _parse_xlsx(self, file_path: str) -> ParseResult:
+        """
+        解析Excel文档
+        
+        Args:
+            file_path: 文件路径
+            
+        Returns:
+            ParseResult: 解析结果
+        """
+        wb = openpyxl.load_workbook(file_path)
+        content = []
+        tables = []
+        
+        # 遍历所有工作表
+        for sheet_idx, sheet_name in enumerate(wb.sheetnames):
+            sheet = wb[sheet_name]
+            content.append(f"# 工作表: {sheet_name}")
+            
+            # 提取表格数据
+            table_data = []
+            max_row = sheet.max_row
+            max_col = sheet.max_column
+            
+            # 提取表头
+            header = []
+            for col in range(1, max_col + 1):
+                cell_value = sheet.cell(row=1, column=col).value
+                header.append(str(cell_value) if cell_value else '')
+            table_data.append(header)
+            
+            # 提取表格内容
+            for row in range(2, max_row + 1):
+                row_data = []
+                for col in range(1, max_col + 1):
+                    cell_value = sheet.cell(row=row, column=col).value
+                    row_data.append(str(cell_value) if cell_value else '')
+                table_data.append(row_data)
+            
+            # 转换为Markdown表格
+            if header:
+                markdown_table = []
+                markdown_table.append('| ' + ' | '.join(header) + ' |')
+                markdown_table.append('| ' + ' | '.join(['---'] * len(header)) + ' |')
+                for row_data in table_data[1:]:
+                    markdown_table.append('| ' + ' | '.join(row_data) + ' |')
+                content.append('\n'.join(markdown_table))
+            
+            tables.append({
+                "sheet_name": sheet_name,
+                "data": table_data
+            })
+        
+        wb.close()
+        
+        return ParseResult(
+            content="\n\n".join(content),
+            metadata={
+                "sheet_count": len(wb.sheetnames),
+                "file_size": os.path.getsize(file_path)
+            },
+            file_type="xlsx",
+            tables=tables
+        )
+    
+    async def _parse_pptx(self, file_path: str) -> ParseResult:
+        """
+        解析PPT文档
+        
+        Args:
+            file_path: 文件路径
+            
+        Returns:
+            ParseResult: 解析结果
+        """
+        prs = Presentation(file_path)
+        content = []
+        
+        # 遍历所有幻灯片
+        for slide_idx, slide in enumerate(prs.slides):
+            content.append(f"# 幻灯片 {slide_idx + 1}")
+            
+            # 提取标题
+            for shape in slide.shapes:
+                if hasattr(shape, 'text_frame') and shape.text_frame.text:
+                    if shape == slide.shapes[0]:  # 假设第一个形状是标题
+                        content.append(f"## {shape.text_frame.text}")
+                    else:
+                        content.append(shape.text_frame.text)
+            
+            # 提取备注
+            if slide.notes_slide:
+                notes = slide.notes_slide.notes_text_frame.text
+                if notes:
+                    content.append(f"### 备注\n{notes}")
+        
+        return ParseResult(
+            content="\n\n".join(content),
+            metadata={
+                "slide_count": len(prs.slides),
+                "file_size": os.path.getsize(file_path)
+            },
+            file_type="pptx"
+        )
+    
+    async def _parse_doc(self, file_path: str) -> ParseResult:
+        """
+        解析.doc文件
+        
+        Args:
+            file_path: 文件路径
+            
+        Returns:
+            ParseResult: 解析结果
+        """
+        # 使用antiword提取.doc文件内容
+        import subprocess
+        try:
+            result = subprocess.run(
+                ['antiword', file_path],
+                capture_output=True,
+                text=True,
+                check=True
+            )
+            text = result.stdout
+        except Exception as e:
+            log.error(f"antiword解析失败: {str(e)}")
+            raise Exception(f"antiword解析失败: {str(e)}")
+        
+        content = [text]
+        
+        return ParseResult(
+            content="\n".join(content),
+            metadata={
+                "file_size": os.path.getsize(file_path)
+            },
+            file_type="doc"
+        )

+ 43 - 0
parser/parsers/text_parser.py

@@ -0,0 +1,43 @@
+from core.router import Parser
+from models.result import ParseResult
+from utils.logger import log
+
+
+class TextParser(Parser):
+    """文本文件解析器"""
+    
+    async def parse(self, file_path: str) -> ParseResult:
+        """
+        解析文本文件
+        
+        Args:
+            file_path: 文件路径
+            
+        Returns:
+            ParseResult: 解析结果
+        """
+        log.info(f"开始解析文本文件: {file_path}")
+        try:
+            # 读取文本文件内容
+            with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
+                content = f.read()
+            
+            # 构建解析结果
+            result = ParseResult(
+                content=content,
+                metadata={
+                    "file_size": len(content),
+                    "line_count": len(content.split('\n'))
+                },
+                file_type="text"
+            )
+            
+            log.info(f"文本文件解析完成,大小: {len(content)} 字符")
+            return result
+        except Exception as e:
+            log.error(f"文本文件解析失败: {str(e)}")
+            return ParseResult(
+                content="",
+                metadata={"error": str(e)},
+                file_type="text"
+            )

+ 151 - 0
parser/parsers/video_parser.py

@@ -0,0 +1,151 @@
+from core.router import Parser
+from models.result import ParseResult
+from utils.logger import log
+from utils.ffmpeg_wrapper import FFmpegWrapper
+import os
+import tempfile
+import base64
+import requests
+from parsers.audio_parser import AudioParser
+
+
+class VideoParser(Parser):
+    """视频文件解析器"""
+    
+    def __init__(self):
+        self.ffmpeg = FFmpegWrapper()
+        self.audio_parser = AudioParser()
+        # Qwen3-VL模型配置
+        self.qwen_api_url = "http://10.192.72.13:7280/v1/chat/completions"
+    
+    async def parse(self, file_path: str) -> ParseResult:
+        """
+        解析视频文件
+        
+        Args:
+            file_path: 文件路径
+            
+        Returns:
+            ParseResult: 解析结果
+        """
+        log.info(f"开始解析视频文件: {file_path}")
+        try:
+            # 1. 提取音频轨道
+            with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as temp_file:
+                temp_audio_path = temp_file.name
+            
+            self.ffmpeg.extract_audio(file_path, temp_audio_path)
+            log.info(f"音频提取完成: {temp_audio_path}")
+            
+            # 2. 使用AudioParser解析音频
+            audio_result = await self.audio_parser.parse(temp_audio_path)
+            log.info("音频解析完成")
+            
+            # 3. 提取关键帧
+            frame_results = []  # 移到外部定义
+            with tempfile.TemporaryDirectory() as temp_dir:
+                # 使用固定频率先提取帧,再通过帧差法筛选关键帧
+                interval_seconds = 10
+                diff_threshold = 15.0
+                keyframes = self.ffmpeg.extract_keyframes(file_path, temp_dir, interval=interval_seconds, diff_threshold=diff_threshold)
+                log.info(f"关键帧提取完成(帧差阈值={diff_threshold}),共{len(keyframes)}张")
+
+                # 4. 使用Qwen3-VL解析关键帧;根据帧文件名计算时间点
+                for idx, frame_path in enumerate(keyframes):
+                    try:
+                        frame_content = self._parse_frame_with_qwen(frame_path)
+                        log.info(f"解析关键帧 {idx+1} 结果长度: {len(frame_content) if frame_content else 0}")
+                        if frame_content:
+                            # 从文件名解析帧序号,filename like frame_000001.jpg
+                            try:
+                                base = os.path.basename(frame_path)
+                                num_part = base.split('_')[1].split('.')[0]
+                                frame_index = int(num_part)
+                                time_second = (frame_index - 1) * interval_seconds
+                            except Exception:
+                                time_second = idx * interval_seconds
+
+                            frame_results.append((time_second, frame_content))
+                            log.info(f"添加关键帧 到结果列表,时间:{time_second}s")
+                        else:
+                            log.warning(f"关键帧 {idx+1} 解析结果为空")
+                    except Exception as e:
+                        log.warning(f"解析关键帧 {idx+1} 失败: {str(e)}")
+            
+            log.info(f"关键帧解析完成,frame_results长度: {len(frame_results)}")
+            
+            # 5. 合并结果
+            content = []
+            content.append("# 音频内容")
+            content.append(audio_result.content)
+            
+            if frame_results:
+                log.info("开始添加画面内容到结果")
+                content.append("\n# 画面内容")
+                for time_second, frame_content in frame_results:
+                    content.append(f"\n## 第{time_second}秒")
+                    content.append(frame_content)
+                    log.info(f"添加第{time_second}秒画面内容,长度: {len(frame_content)}")
+            else:
+                log.warning("没有画面内容可以添加")
+            
+            # 清理临时文件
+            if os.path.exists(temp_audio_path):
+                os.remove(temp_audio_path)
+            
+            return ParseResult(
+                content="\n".join(content),
+                metadata={
+                    "parser": "VideoParser",
+                    "file_size": os.path.getsize(file_path),
+                    "audio_parser": "Qwen3-ASR",
+                    "visual_parser": "Qwen3-VL",
+                    "keyframe_count": len(keyframes)
+                },
+                file_type="video"
+            )
+        except Exception as e:
+            log.error(f"视频文件解析失败: {str(e)}")
+            # 清理临时文件
+            if 'temp_audio_path' in locals() and os.path.exists(temp_audio_path):
+                os.remove(temp_audio_path)
+            return ParseResult(
+                content="",
+                metadata={"error": str(e)},
+                file_type="video"
+            )
+    
+    def _parse_frame_with_qwen(self, image_path: str) -> str:
+        """
+        使用Qwen3-VL模型解析图片
+        
+        Args:
+            image_path: 图片路径
+            
+        Returns:
+            str: 解析结果
+        """
+        log.info(f"使用Qwen3-VL解析图片: {image_path}")
+        
+        # 编码图片
+        with open(image_path, "rb") as f:
+            base64_image = base64.b64encode(f.read()).decode("utf-8")
+
+        # 发送请求
+        payload = {
+            "model": "/model",
+            "messages": [{
+                "role": "user",
+                "content": [
+                    {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}},
+                    {"type": "text", "text": "详细描述这张图片的内容,包括人物、物体、场景、文字等所有可见信息"}
+                ]
+            }],
+            "max_tokens": 512
+        }
+
+        response = requests.post(self.qwen_api_url, json=payload, timeout=120)
+        response.raise_for_status()
+        result = response.json()
+        
+        return result["choices"][0]["message"]["content"]

+ 336 - 0
parser/parsers/visual_parser.py

@@ -0,0 +1,336 @@
+from core.router import Parser
+from models.result import ParseResult
+from utils.logger import log
+import os
+import time
+import httpx
+import json
+import io
+import zipfile
+from pathlib import Path
+import asyncio
+
+# 延迟导入PaddleOCR,避免模块级初始化
+
+
+class VisualDocParser(Parser):
+    """视觉文档解析器,处理图片和扫描件PDF"""
+    
+    def __init__(self):
+        # MinerU API配置 - 使用本地部署的服务
+        self.mineru_api_key = ""
+        self.base_url = "http://10.192.72.13:7284"
+        self.model_version = "hybrid-auto-engine"
+        self.poll_interval_sec = 3.0
+        self.max_wait_sec = 300.0
+        log.info("VisualDocParser初始化完成,使用本地部署的MinerU服务")
+    
+    async def parse(self, file_path: str) -> ParseResult:
+        """
+        解析视觉文档
+        
+        Args:
+            file_path: 文件路径
+            
+        Returns:
+            ParseResult: 解析结果
+        """
+        log.info(f"开始解析视觉文档: {file_path}")
+        try:
+            # 只使用MinerU API,避免PaddleOCR的初始化问题
+            result = await self._try_mineru(file_path)
+            if result:
+                return result
+            
+            # MinerU失败时,返回错误信息
+            return ParseResult(
+                content="",
+                metadata={"error": "MinerU API解析失败"},
+                file_type="visual"
+            )
+        except Exception as e:
+            log.error(f"视觉文档解析失败: {str(e)}")
+            return ParseResult(
+                content="",
+                metadata={"error": str(e)},
+                file_type="visual"
+            )
+    
+    async def _try_mineru(self, file_path: str) -> ParseResult:
+        """
+        尝试使用本地MinerU API解析
+        
+        Args:
+            file_path: 文件路径
+            
+        Returns:
+            ParseResult: 解析结果,如果失败返回None
+        """
+        try:
+            log.info("开始使用本地MinerU API解析文件")
+
+            file_path_obj = Path(file_path)
+            if not file_path_obj.exists():
+                raise FileNotFoundError(str(file_path))
+
+            log.info(f"Calling local MinerU for file: {file_path}")
+
+            # 直接使用本地API上传文件并获取结果
+            result = await self._upload_and_parse(file_path_obj)
+            
+            # 提取文本内容
+            text_content = self._extract_text_from_local_result(result)
+            
+            return ParseResult(
+                content=text_content,
+                metadata={
+                    "parser": "Local MinerU API",
+                    "file_size": file_path_obj.stat().st_size,
+                    "backend": self.model_version
+                },
+                file_type="visual"
+            )
+            
+        except Exception as e:
+            log.warning(f"本地MinerU API解析失败: {str(e)}")
+            return None
+    
+    async def _upload_and_parse(self, file_path: Path) -> dict:
+        """
+        上传文件并解析
+        
+        Args:
+            file_path: 文件路径
+            
+        Returns:
+            dict: 解析结果
+        """
+        url = f"{self.base_url}/file_parse"
+        
+        # 准备表单数据
+        files = {
+            "files": (file_path.name, open(file_path, 'rb'))
+        }
+        
+        # 准备参数
+        data = {
+            "backend": self.model_version,
+            "lang_list": ["ch"],
+            "return_md": True,
+            "formula_enable": True,
+            "table_enable": True
+        }
+
+        async with httpx.AsyncClient(timeout=300) as client:
+            resp = await client.post(url, files=files, data=data)
+            resp.raise_for_status()
+            result = resp.json()
+
+        return result
+
+    def _extract_text_from_local_result(self, result: dict) -> str:
+        """
+        从本地MinerU返回的结果中提取文本内容
+        
+        Args:
+            result: 本地MinerU返回的结果
+            
+        Returns:
+            str: 提取的文本内容
+        """
+        # 处理不同可能的返回结构
+        text_parts = []
+        
+        if isinstance(result, dict):
+            # 检查是否有results字段(新的返回结构)
+            if "results" in result:
+                results = result["results"]
+                if isinstance(results, dict):
+                    for key, value in results.items():
+                        if isinstance(value, dict):
+                            # 检查是否有md_content字段
+                            if "md_content" in value:
+                                text_parts.append(str(value["md_content"]))
+                            # 检查是否有text字段
+                            elif "text" in value:
+                                text_parts.append(str(value["text"]))
+            # 检查是否有markdown内容
+            elif "markdown" in result:
+                text_parts.append(str(result["markdown"]))
+            # 检查是否有text字段
+            elif "text" in result:
+                text_parts.append(str(result["text"]))
+            # 检查是否有content字段
+            elif "content" in result:
+                if isinstance(result["content"], str):
+                    text_parts.append(result["content"])
+                elif isinstance(result["content"], list):
+                    for item in result["content"]:
+                        if isinstance(item, dict) and "text" in item:
+                            text_parts.append(str(item["text"]))
+                        elif isinstance(item, str):
+                            text_parts.append(item)
+        
+        return "\n\n".join(text_parts)
+    
+    def _safe_stem(self, stem: str) -> str:
+        """
+        创建安全的缓存键
+        
+        Args:
+            stem: 文件stem
+            
+        Returns:
+            str: 安全的缓存键
+        """
+        import re
+        return re.sub(r'[^a-zA-Z0-9_-]', '_', stem)
+    
+    def _extract_text_from_payload(self, payload: dict) -> str:
+        """
+        从MinerU返回的payload中提取文本内容
+        
+        Args:
+            payload: MinerU返回的payload
+            
+        Returns:
+            str: 提取的文本内容
+        """
+        # 根据MinerU API返回的结构提取文本
+        text_parts = []
+        
+        # 处理不同可能的返回结构
+        if isinstance(payload, dict):
+            # 检查是否有text字段
+            if "text" in payload:
+                text_parts.append(str(payload["text"]))
+            # 检查是否有content字段
+            elif "content" in payload:
+                if isinstance(payload["content"], str):
+                    text_parts.append(payload["content"])
+                elif isinstance(payload["content"], list):
+                    for item in payload["content"]:
+                        if isinstance(item, dict) and "text" in item:
+                            text_parts.append(str(item["text"]))
+                        elif isinstance(item, str):
+                            text_parts.append(item)
+            # 检查是否有pages字段
+            elif "pages" in payload:
+                for page_num, page_content in enumerate(payload["pages"], 1):
+                    text_parts.append(f"# 第{page_num}页")
+                    if isinstance(page_content, str):
+                        text_parts.append(page_content)
+                    elif isinstance(page_content, dict) and "text" in page_content:
+                        text_parts.append(str(page_content["text"]))
+        elif isinstance(payload, list):
+            for item in payload:
+                if isinstance(item, dict) and "text" in item:
+                    text_parts.append(str(item["text"]))
+                elif isinstance(item, str):
+                    text_parts.append(item)
+        
+        return "\n\n".join(text_parts)
+    
+    async def _use_paddleocr(self, file_path: str) -> ParseResult:
+        """
+        使用PaddleOCR解析
+        
+        Args:
+            file_path: 文件路径
+            
+        Returns:
+            ParseResult: 解析结果
+        """
+        log.info("使用PaddleOCR解析视觉文档")
+        
+        # 检查PaddleOCR是否初始化成功
+        if self.ocr is None:
+            log.error("PaddleOCR未初始化,无法解析")
+            return ParseResult(
+                content="",
+                metadata={"error": "PaddleOCR未初始化"},
+                file_type="visual"
+            )
+        
+        # 对于PDF文件,需要先转换为图片
+        if file_path.endswith('.pdf'):
+            return await self._ocr_pdf(file_path)
+        else:
+            return await self._ocr_image(file_path)
+    
+    async def _ocr_pdf(self, file_path: str) -> ParseResult:
+        """
+        OCR处理PDF文件
+        
+        Args:
+            file_path: 文件路径
+            
+        Returns:
+            ParseResult: 解析结果
+        """
+        import fitz  # PyMuPDF
+        
+        doc = fitz.open(file_path)
+        content = []
+        page_count = len(doc)
+        
+        # 遍历所有页面
+        for page_num in range(page_count):
+            page = doc[page_num]
+            # 将页面转换为图片
+            pix = page.get_pixmap(dpi=300)
+            img_path = f"temp_page_{page_num}.png"
+            pix.save(img_path)
+            
+            # OCR处理图片
+            ocr_result = self.ocr.ocr(img_path, cls=True)
+            page_text = []
+            
+            for line in ocr_result:
+                for word_info in line:
+                    page_text.append(word_info[1][0])
+            
+            content.append(f"# 第{page_num + 1}页\n{' '.join(page_text)}")
+            
+            # 删除临时图片
+            if os.path.exists(img_path):
+                os.remove(img_path)
+        
+        doc.close()
+        
+        return ParseResult(
+            content="\n\n".join(content),
+            metadata={
+                "parser": "PaddleOCR",
+                "page_count": page_count,
+                "file_size": os.path.getsize(file_path)
+            },
+            file_type="pdf_scanned"
+        )
+    
+    async def _ocr_image(self, file_path: str) -> ParseResult:
+        """
+        OCR处理图片文件
+        
+        Args:
+            file_path: 文件路径
+            
+        Returns:
+            ParseResult: 解析结果
+        """
+        # 使用PaddleOCR识别图片
+        ocr_result = self.ocr.ocr(file_path, cls=True)
+        content = []
+        
+        for line in ocr_result:
+            for word_info in line:
+                content.append(word_info[1][0])
+        
+        return ParseResult(
+            content=' '.join(content),
+            metadata={
+                "parser": "PaddleOCR",
+                "file_size": os.path.getsize(file_path)
+            },
+            file_type="image"
+        )

+ 25 - 2
parser/requirements.txt

@@ -1,5 +1,28 @@
 fastapi==0.104.1
 uvicorn==0.24.0
 pydantic==2.4.2
-#psutil==5.9.6
-python-multipart==0.0.6
+python-multipart==0.0.6
+
+# 基础库
+filetype==1.2.0
+loguru==0.7.2
+pandas>=2.2.0
+
+# 文档处理
+PyMuPDF>=1.24.0
+python-docx==0.8.11
+openpyxl==3.1.2
+python-pptx==0.6.21
+
+# OCR处理
+paddleocr>=2.8.0
+paddlepaddle>=3.0.0
+
+# 音频处理
+funasr>=0.3.1
+
+# 视频处理(FFmpeg需要系统安装)
+
+# 开发工具
+pytest==7.4.3
+black==23.11.0

+ 165 - 0
parser/utils/ffmpeg_wrapper.py

@@ -0,0 +1,165 @@
+import subprocess
+import os
+from typing import Optional
+try:
+    import cv2
+    import numpy as np
+except Exception:
+    cv2 = None
+    np = None
+
+
+class FFmpegWrapper:
+    """FFmpeg命令行包装工具"""
+    
+    def __init__(self):
+        self.ffmpeg_path = "ffmpeg"  # 假设ffmpeg已在系统PATH中
+    
+    def extract_audio(self, video_path: str, output_audio_path: str) -> bool:
+        """
+        从视频中提取音频轨道
+        
+        Args:
+            video_path: 视频文件路径
+            output_audio_path: 输出音频文件路径
+            
+        Returns:
+            bool: 操作是否成功
+        """
+        try:
+            cmd = [
+                self.ffmpeg_path,
+                "-i", video_path,
+                "-vn",  # 禁用视频
+                "-acodec", "pcm_s16le",  # 16位PCM
+                "-ar", "16000",  # 16kHz采样率
+                "-ac", "1",  # 单声道
+                "-y",  # 覆盖输出文件
+                output_audio_path
+            ]
+            
+            subprocess.run(cmd, check=True, capture_output=True, text=True)
+            return True
+        except subprocess.CalledProcessError as e:
+            raise Exception(f"音频提取失败: {e.stderr}")
+    
+    def convert_audio(self, input_audio_path: str, output_audio_path: str) -> bool:
+        """
+        转换音频格式为16k/16bit/mono wav
+        
+        Args:
+            input_audio_path: 输入音频文件路径
+            output_audio_path: 输出音频文件路径
+            
+        Returns:
+            bool: 操作是否成功
+        """
+        try:
+            cmd = [
+                self.ffmpeg_path,
+                "-i", input_audio_path,
+                "-acodec", "pcm_s16le",
+                "-ar", "16000",
+                "-ac", "1",
+                "-y",
+                output_audio_path
+            ]
+            
+            subprocess.run(cmd, check=True, capture_output=True, text=True)
+            return True
+        except subprocess.CalledProcessError as e:
+            raise Exception(f"音频转换失败: {e.stderr}")
+    
+    def extract_keyframes(self, video_path: str, output_dir: str, interval: int = 60, diff_threshold: Optional[float] = None) -> list:
+        """
+        从视频中提取关键帧
+        
+        Args:
+            video_path: 视频文件路径
+            output_dir: 输出目录
+            interval: 提取间隔(秒)
+            
+        Returns:
+            list: 提取的关键帧文件路径列表
+        """
+        try:
+            # 确保输出目录存在
+            os.makedirs(output_dir, exist_ok=True)
+            
+            # 提取关键帧(按固定频率导出帧)
+            output_pattern = os.path.join(output_dir, "frame_%06d.jpg")
+            cmd = [
+                self.ffmpeg_path,
+                "-i", video_path,
+                "-vf", f"fps=1/{interval}",  # 每 interval 秒一张
+                "-y",
+                output_pattern
+            ]
+            
+            subprocess.run(cmd, check=True, capture_output=True, text=True)
+            
+            # 收集提取的帧
+            frames = []
+            for file in os.listdir(output_dir):
+                if file.startswith("frame_") and file.endswith(".jpg"):
+                    frames.append(os.path.join(output_dir, file))
+
+            frames = sorted(frames)
+
+            # 如果未提供差异阈值,直接返回所有按固定频率提取的帧
+            if diff_threshold is None:
+                return frames
+
+            # 检查依赖
+            if cv2 is None or np is None:
+                raise Exception("OpenCV (opencv-python) 和 numpy 需要安装以启用帧差法(pip install opencv-python numpy)")
+
+            # 使用 OpenCV 的灰度图像计算帧差,比较当前帧与上一个边界帧(pre),当差异>=阈值时标记为关键帧
+            # 首先读取所有帧(彩色),以便能使用 cvtColor 按要求比较
+            imgs = []
+            for frame_path in frames:
+                try:
+                    img = cv2.imread(frame_path)  # BGR
+                    imgs.append(img)
+                except Exception:
+                    imgs.append(None)
+
+            filtered = []
+            # 为了加速计算,统一缩放尺寸 (width, height)
+            resize_to = (320, 240)
+
+            # 找到第一个有效帧作为初始关键帧
+            pre = None
+            for idx, img in enumerate(imgs):
+                if img is not None:
+                    filtered.append(frames[idx])
+                    pre = idx
+                    break
+
+            if pre is None:
+                return []
+
+            # 从下一个帧开始,比较当前帧与 imgs[pre]
+            for i in range(pre + 1, len(imgs)):
+                curr = imgs[i]
+                if curr is None:
+                    continue
+                prev = imgs[pre]
+                try:
+                    prev_gray = cv2.cvtColor(prev, cv2.COLOR_BGR2GRAY)
+                    curr_gray = cv2.cvtColor(curr, cv2.COLOR_BGR2GRAY)
+                    if resize_to is not None:
+                        prev_gray = cv2.resize(prev_gray, resize_to, interpolation=cv2.INTER_AREA)
+                        curr_gray = cv2.resize(curr_gray, resize_to, interpolation=cv2.INTER_AREA)
+
+                    diff_val = np.mean(np.abs(curr_gray.astype(int) - prev_gray.astype(int)))
+                except Exception:
+                    continue
+
+                if diff_val >= float(diff_threshold):
+                    filtered.append(frames[i])
+                    pre = i
+
+            return filtered
+        except subprocess.CalledProcessError as e:
+            raise Exception(f"关键帧提取失败: {e.stderr}")

+ 47 - 0
parser/utils/logger.py

@@ -0,0 +1,47 @@
+from loguru import logger
+import os
+
+
+class Logger:
+    """日志管理工具"""
+    
+    def __init__(self, log_file: str = "parsing.log"):
+        """
+        初始化日志配置
+        
+        Args:
+            log_file: 日志文件路径
+        """
+        # 移除默认的控制台输出
+        logger.remove()
+        
+        # 添加控制台输出
+        logger.add(
+            sink=lambda msg: print(msg, end=""),
+            level="INFO",
+            format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>"
+        )
+        
+        # 添加文件输出
+        logger.add(
+            sink=log_file,
+            level="DEBUG",
+            rotation="100 MB",
+            compression="zip",
+            format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}"
+        )
+    
+    @property
+    def log(self):
+        """
+        获取logger实例
+        
+        Returns:
+            logger: loguru logger实例
+        """
+        return logger
+
+
+# 创建全局日志实例
+logger_instance = Logger()
+log = logger_instance.log

+ 51 - 0
parser/utils/mime_detector.py

@@ -0,0 +1,51 @@
+import filetype
+import os
+
+
+class MimeDetector:
+    """文件MIME类型检测工具"""
+    
+    def __init__(self):
+        pass
+    
+    def detect(self, file_path: str) -> str:
+        """
+        检测文件的MIME类型
+        
+        Args:
+            file_path: 文件路径
+            
+        Returns:
+            str: MIME类型字符串
+        """
+        try:
+            # 使用filetype库检测文件类型
+            kind = filetype.guess(file_path)
+            if kind:
+                return kind.mime
+            else:
+                # 如果filetype无法检测,根据文件扩展名猜测
+                ext = os.path.splitext(file_path)[1].lower()
+                ext_to_mime = {
+                    '.txt': 'text/plain',
+                    '.md': 'text/markdown',
+                    '.pdf': 'application/pdf',
+                    '.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
+                    '.doc': 'application/msword',
+                    '.xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
+                    '.xls': 'application/vnd.ms-excel',
+                    '.pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation',
+                    '.ppt': 'application/vnd.ms-powerpoint',
+                    '.jpg': 'image/jpeg',
+                    '.jpeg': 'image/jpeg',
+                    '.png': 'image/png',
+                    '.gif': 'image/gif',
+                    '.wav': 'audio/wav',
+                    '.mp3': 'audio/mpeg',
+                    '.mp4': 'video/mp4',
+                    '.avi': 'video/x-msvideo',
+                    '.mov': 'video/quicktime'
+                }
+                return ext_to_mime.get(ext, 'application/octet-stream')
+        except Exception as e:
+            raise Exception(f"文件类型检测失败: {str(e)}")

+ 80 - 0
parser/utils/stability.py

@@ -0,0 +1,80 @@
+import asyncio
+import functools
+from typing import Callable, Any, List, Coroutine
+from utils.logger import log
+
+
+def timeout(seconds: int):
+    """
+    超时装饰器,防止函数执行时间过长
+    
+    Args:
+        seconds: 超时时间(秒)
+        
+    Returns:
+        Callable: 装饰后的函数
+    """
+    def decorator(func: Callable) -> Callable:
+        @functools.wraps(func)
+        async def wrapper(*args, **kwargs) -> Any:
+            try:
+                return await asyncio.wait_for(func(*args, **kwargs), timeout=seconds)
+            except asyncio.TimeoutError:
+                log.error(f"函数 {func.__name__} 执行超时,已超过 {seconds} 秒")
+                raise Exception(f"执行超时,已超过 {seconds} 秒")
+        return wrapper
+    return decorator
+
+
+class AsyncDispatcher:
+    """异步调度器,支持并发处理多个任务"""
+    
+    def __init__(self, max_concurrency: int = 5):
+        """
+        初始化异步调度器
+        
+        Args:
+            max_concurrency: 最大并发数
+        """
+        self.max_concurrency = max_concurrency
+    
+    async def run(self, tasks: List[Coroutine]) -> List[Any]:
+        """
+        并发执行多个任务
+        
+        Args:
+            tasks: 任务列表
+            
+        Returns:
+            List[Any]: 任务执行结果列表
+        """
+        log.info(f"开始并发执行 {len(tasks)} 个任务,最大并发数: {self.max_concurrency}")
+        
+        # 创建信号量控制并发
+        semaphore = asyncio.Semaphore(self.max_concurrency)
+        
+        async def bounded_task(task: Coroutine) -> Any:
+            async with semaphore:
+                try:
+                    return await task
+                except Exception as e:
+                    log.error(f"任务执行失败: {str(e)}")
+                    return None
+        
+        # 并发执行任务
+        results = await asyncio.gather(
+            *[bounded_task(task) for task in tasks],
+            return_exceptions=True
+        )
+        
+        # 处理异常结果
+        processed_results = []
+        for i, result in enumerate(results):
+            if isinstance(result, Exception):
+                log.error(f"第 {i+1} 个任务执行失败: {str(result)}")
+                processed_results.append(None)
+            else:
+                processed_results.append(result)
+        
+        log.info(f"并发任务执行完成,成功 {sum(1 for r in processed_results if r is not None)} 个")
+        return processed_results

+ 5 - 0
schedule-consumer/pom.xml

@@ -37,6 +37,11 @@
             <groupId>org.projectlombok</groupId>
             <artifactId>lombok</artifactId>
         </dependency>
+        <!-- Spring WebFlux(WebClient) -->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-webflux</artifactId>
+        </dependency>
     </dependencies>
 
     <build>

+ 1 - 1
schedule-consumer/src/main/java/cn/com/yusys/consumer/config/KafkaConsumerConfig.java

@@ -85,7 +85,7 @@ public class KafkaConsumerConfig {
         ConcurrentKafkaListenerContainerFactory<String, String> factory =
                 new ConcurrentKafkaListenerContainerFactory<>();
         factory.setConsumerFactory(consumerFactory());
-        factory.setConcurrency(3);
+        factory.setConcurrency(1);
 
         // 【核心】注入错误处理器
         factory.setCommonErrorHandler(errorHandler);

+ 42 - 6
schedule-consumer/src/main/java/cn/com/yusys/consumer/listener/MessageListener.java

@@ -1,17 +1,26 @@
 package cn.com.yusys.consumer.listener;
 
+import cn.com.yusys.consumer.model.Task;
+import cn.com.yusys.consumer.util.ParseServiceClient;
+import cn.com.yusys.consumer.util.response.ExecuteResponse;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.kafka.support.Acknowledgment;
 import org.springframework.stereotype.Component;
 
+import java.util.UUID;
+
 @Component
 public class MessageListener {
     private static final Logger log = LoggerFactory.getLogger(MessageListener.class);
 
+    @Autowired
+    private ParseServiceClient parseServiceClient;
+
     // 从配置读取逗号分隔的 Topic 字符串
     @Value("${kafka.topics.listen}")
     private String topicsConfig;
@@ -26,19 +35,46 @@ public class MessageListener {
         log.info("=== [CONSUMER] 收到消息 | Topic: {} | Key: {} | Msg: {} | Offset: {} ===",
                 record.topic(), record.key(), message, record.offset());
 
-        processBusinessLogic(message);
-        acknowledgment.acknowledge();
+        ExecuteResponse executeResponse = processBusinessLogic(message);
+
+        if (executeResponse != null && executeResponse.getCode()==200) {
+            log.info("=== [CONSUMER] 任务处理成功 ===");
+            acknowledgment.acknowledge();
+
+        }else{
+            log.info("=== [CONSUMER] 任务处理失败===");
+            try {
+                // 任务处理失败时睡眠5秒
+                Thread.sleep(5000);
+            } catch (InterruptedException e) {
+                // 捕获中断异常,恢复线程中断状态
+                log.error("=== [CONSUMER] sleep过程中线程被中断 ===", e);
+                Thread.currentThread().interrupt(); // 重置中断标记
+            }
+            log.info("=== [CONSUMER] sleep结束,继续处理后续任务 ===");
+
+        }
+
 
     }
 
-    private void processBusinessLogic(String message) {
+    private ExecuteResponse processBusinessLogic(String message) {
         if ("error".equals(message)) {
             throw new RuntimeException("Simulated Business Exception");
         }
+        
         try {
-            Thread.sleep(500);
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
+            // 调用ParseServiceClient执行任务
+            log.info("开始调用execute接口执行任务,消息内容:{}", message);
+            Task task=new Task();
+            task.setTaskId(UUID.randomUUID().toString());
+            task.setFilePath(message);
+            ExecuteResponse response = parseServiceClient.executeTask(task);
+            log.info("任务执行完成,响应:{}", response);
+            return response;
+        } catch (Exception e) {
+            log.error("调用execute接口执行任务失败", e);
+            throw new RuntimeException("任务执行失败", e);
         }
     }
 }

+ 20 - 0
schedule-consumer/src/main/java/cn/com/yusys/consumer/model/Task.java

@@ -0,0 +1,20 @@
+package cn.com.yusys.consumer.model;
+
+import lombok.Data;
+
+/**
+ * 任务类,用于存储任务相关信息
+ */
+@Data
+public class Task {
+
+    /**
+     * 文件路径,用于存储任务关联的文件路径信息
+     */
+    private String filePath;
+
+    /**
+     * 任务ID,用于存储任务ID
+     */
+    private String taskId;
+}

+ 88 - 0
schedule-consumer/src/main/java/cn/com/yusys/consumer/util/ParseServiceClient.java

@@ -0,0 +1,88 @@
+package cn.com.yusys.consumer.util;
+
+import cn.com.yusys.consumer.model.Task;
+import cn.com.yusys.consumer.util.response.ExecuteResponse;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.stereotype.Component;
+import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
+import reactor.core.publisher.Mono;
+import org.springframework.beans.factory.annotation.Value;
+
+/**
+ * 解析服务实例实例接口调用工具
+ */
+@Slf4j
+@Component
+public class ParseServiceClient {
+
+
+    private final WebClient webClient;
+
+    @Value("${url.execute}:\"http://127.0.0.1:8083//api/manager/parse\"")
+    private String executeUrl ;
+
+    // 通过构造函数注入WebClient.Builder,利用Spring自动配置
+    public ParseServiceClient(WebClient.Builder webClientBuilder) {
+        this.webClient = webClientBuilder
+                .codecs(config -> config.defaultCodecs().maxInMemorySize(1024 * 1024))
+                .build();
+    }
+
+    /**
+     * 调用接口执行任务
+     * @param taskData 任务数据
+     * @return 执行结果响应
+     */
+    public ExecuteResponse executeTask(Task taskData) {
+        String executeUrl = "http://127.0.0.1:8083//api/manager/parse";
+        try {
+            ExecuteResponse response = webClient.post()
+                    .uri(executeUrl)
+                    .contentType(MediaType.APPLICATION_JSON)
+                    .bodyValue(taskData)
+                    .retrieve()
+                    .onStatus(HttpStatus::isError, clientResponse ->
+                            Mono.error(new WebClientResponseException(
+                                    "执行接口返回异常状态码",
+                                    clientResponse.statusCode().value(),
+                                    clientResponse.statusCode().getReasonPhrase(),
+                                    null, null, null)))
+                    .bodyToMono(ExecuteResponse.class)
+                    .timeout(java.time.Duration.ofSeconds(30))
+                    .block();
+
+            if (response != null && 200 == response.getCode()) {
+                log.debug("任务执行成功,响应:{}", response);
+                return response;
+            } else {
+                log.warn("任务执行返回失败,响应:{}", response);
+                return createErrorResponse("接口返回非200响应");
+            }
+        } catch (WebClientResponseException e) {
+            log.error("任务执行失败,HTTP状态码:{}", e.getRawStatusCode(), e);
+            return createErrorResponse(String.format("HTTP请求失败,状态码:%d", e.getRawStatusCode()));
+        } catch (Exception e) {
+            log.error("任务执行异常", e);
+            return createErrorResponse(e.getMessage());
+        }
+    }
+
+
+
+    /**
+     * 创建错误响应
+     * @param errorMessage 错误消息
+     * @return ExecuteResponse 错误响应
+     */
+    private ExecuteResponse createErrorResponse(String errorMessage) {
+        ExecuteResponse response = new ExecuteResponse();
+        response.setCode(500);
+        response.setMessage(errorMessage);
+        return response;
+    }
+
+
+}

+ 24 - 0
schedule-consumer/src/main/java/cn/com/yusys/consumer/util/response/ExecuteResponse.java

@@ -0,0 +1,24 @@
+package cn.com.yusys.consumer.util.response;
+
+import lombok.Data;
+
+/**
+ * 执行任务响应类
+ */
+@Data
+public class ExecuteResponse {
+    /**
+     * 响应码
+     */
+    private Integer code;
+
+    /**
+     * 响应消息
+     */
+    private String message;
+
+    /**
+     * 响应数据
+     */
+    private Object data;
+}

+ 40 - 0
schedule-consumer/src/main/java/cn/com/yusys/consumer/util/response/InstanceStatusResponse.java

@@ -0,0 +1,40 @@
+package cn.com.yusys.consumer.util.response;
+
+import lombok.Data;
+
+/**
+ * 实例状态响应类
+ */
+@Data
+public class InstanceStatusResponse {
+    /**
+     * 响应码
+     */
+    private Integer code;
+
+    /**
+     * 响应消息
+     */
+    private String message;
+
+    /**
+     * 状态数据
+     */
+    private StatusData data;
+
+    /**
+     * 状态数据内部类
+     */
+    @Data
+    public static class StatusData {
+        /**
+         * 状态
+         */
+        private String status;
+
+        /**
+         * 其他状态信息
+         */
+        private Object info;
+    }
+}

+ 4 - 1
schedule-consumer/src/main/resources/application.yml

@@ -16,4 +16,7 @@ spring:
 # 可选:在配置中定义 Topic 列表,方便管理
 kafka:
   topics:
-    listen: test-topic
+    listen: schedule-topic
+
+url:
+  execute: "http://127.0.0.1:8083//api/manager/parse"

+ 5 - 0
schedule-manager/pom.xml

@@ -55,6 +55,11 @@
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-webflux</artifactId>
         </dependency>
+        <!-- Spring Boot 校验启动器(自动包含 jakarta.validation 核心包) -->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-validation</artifactId>
+        </dependency>
 
         <!-- SpringBoot测试核心依赖 -->
         <dependency>

+ 66 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/controller/ManagerController.java

@@ -0,0 +1,66 @@
+package cn.com.yusys.manager.controller;
+
+import cn.com.yusys.manager.model.ExecuteResponse;
+import cn.com.yusys.manager.model.Task;
+import cn.com.yusys.manager.service.InstanceMonitorService;
+import cn.com.yusys.manager.model.InstanceManagementResponse;
+import cn.com.yusys.manager.model.InstanceConfigRequest;
+import cn.com.yusys.manager.model.TaskLogResponse;
+import cn.com.yusys.manager.service.TaskLogService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.*;
+
+import javax.validation.Valid;
+
+/**
+ * 调度管理控制器
+ */
+@RestController
+@RequestMapping("/api/manager")
+public class ManagerController {
+
+    @Autowired
+    private InstanceMonitorService parserService;
+
+    @Autowired
+    private TaskLogService taskLogService;
+
+    /**
+     * 接收任务调用,解析实例并执行解析任务
+     * @param request 任务请求对象
+     * @return 任务执行结果
+     */
+    @PostMapping("/parse")
+    public ExecuteResponse executeParseTask(@RequestBody Task request) {
+        return parserService.processMultimodalTask(request);
+    }
+
+    /**
+     * 获取实例管理信息
+     * @return 实例管理信息
+     */
+    @GetMapping("/instances")
+    public InstanceManagementResponse getInstanceManagementInfo() {
+        return parserService.getInstanceManagementInfo();
+    }
+
+    /**
+     * 更新实例配置
+     * @param request 实例配置请求
+     * @return 操作结果
+     */
+    @PostMapping("/instances/config")
+    public InstanceManagementResponse updateInstanceConfig( @Valid @RequestBody InstanceConfigRequest request) {
+        return parserService.updateInstanceConfig(request);
+    }
+
+    /**
+     * 查询任务日志
+     * @param taskId 任务ID
+     * @return 任务日志
+     */
+    @GetMapping("/tasks/{taskId}/logs")
+    public TaskLogResponse getTaskLogs(@PathVariable String taskId) {
+        return TaskLogResponse.success(taskId, taskLogService.getTaskLogs(taskId));
+    }
+}

+ 32 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/model/ExecuteResponse.java

@@ -0,0 +1,32 @@
+package cn.com.yusys.manager.model;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.RequiredArgsConstructor;
+
+/**
+ * 执行结果响应类
+ * 用于封装API接口的返回结果,包含状态码、消息和数据
+ */
+@Data
+@AllArgsConstructor
+@RequiredArgsConstructor
+public  class ExecuteResponse {  // 执行结果响应类
+    private Integer code;  // 响应状态码,通常用于表示请求处理结果的状态
+    private String message;  // 响应消息,通常用于描述请求处理结果的信息
+    private Object data;  // 响应数据,用于承载请求处理后的返回数据
+
+
+
+    public static ExecuteResponse success(Object data) {
+        return new ExecuteResponse(200, "success", data);
+    }
+    public static ExecuteResponse fail(String message) {
+        return new ExecuteResponse(500, message, null);
+    }
+
+    public static ExecuteResponse fail(Integer code,String message) {
+        return new ExecuteResponse(code, message, null);
+    }
+
+}

+ 32 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/model/InstanceConfigRequest.java

@@ -0,0 +1,32 @@
+
+package cn.com.yusys.manager.model;
+
+import javax.validation.constraints.Max;
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+import lombok.Data;
+
+
+
+/**
+ * 实例配置请求实体
+ */
+@Data
+public class InstanceConfigRequest {
+
+    /**
+     * 最小活跃实例数
+     */
+    @NotNull(message = "最小活跃实例数不能为空")
+    @Min(value = 1, message = "最小活跃实例数不能小于1")
+    @Max(value = 50, message = "最小活跃实例数不能大于50")
+    private Integer minActiveInstance;
+
+    /**
+     * 最大活跃实例数
+     */
+    @NotNull(message = "最大活跃实例数不能为空")
+    @Min(value = 1, message = "最大活跃实例数不能小于1")
+    @Max(value = 100, message = "最大活跃实例数不能大于100")
+    private Integer maxActiveInstance;
+}

+ 118 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/model/InstanceManagementResponse.java

@@ -0,0 +1,118 @@
+
+package cn.com.yusys.manager.model;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+
+/**
+ * 实例管理响应实体
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class InstanceManagementResponse {
+    /**
+     * 响应码
+     */
+    private Integer code;
+
+    /**
+     * 响应消息
+     */
+    private String message;
+
+
+    private InstanceManagementData data;
+
+    /**
+     * 实例详情
+     */
+    @Data
+    @Builder
+    @NoArgsConstructor
+    @AllArgsConstructor
+    public static class InstanceDetail {
+        /**
+         * 容器ID
+         */
+        private String containerId;
+
+        /**
+         * 实例IP
+         */
+        private String ip;
+
+        /**
+         * 实例端口
+         */
+        private Integer port;
+
+        /**
+         * 实例状态 (0-空闲 1-运行中 2-异常)
+         */
+        private Integer status;
+
+        /**
+         * CPU使用率
+         */
+        private Double cpuUsage;
+
+        /**
+         * 内存使用率
+         */
+        private Double memoryUsage;
+
+        /**
+         * GPU使用率
+         */
+        private Double gpuUsage;
+
+        /**
+         * GPU内存使用量
+         */
+        private Double gpuMemory;
+
+        /**
+         * 上次心跳时间
+         */
+        private Long lastHeartbeatTime;
+    }
+
+    /**
+     * 创建成功响应
+     */
+    public static InstanceManagementResponse success(InstanceManagementData data) {
+        return InstanceManagementResponse.builder()
+                .code(200)
+                .message("操作成功")
+                .data(data)
+                .build();
+    }
+
+    /**
+     * 创建失败响应
+     */
+    public static InstanceManagementResponse fail(String message) {
+        return InstanceManagementResponse.builder()
+                .code(500)
+                .message(message)
+                .build();
+    }
+
+    @Data
+    @Builder
+    @NoArgsConstructor
+    @AllArgsConstructor
+    public static class InstanceManagementData {
+        private Integer totalInstances;      // 总实例数
+        private Integer idleInstances;       // 空闲实例数
+        private Integer runningInstances;    // 运行中实例数
+        private Integer errorInstances;      // 异常实例数
+        private List<InstanceDetail> instanceDetails; // 实例详情列表
+    }
+}

+ 22 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/model/Task.java

@@ -0,0 +1,22 @@
+package cn.com.yusys.manager.model;
+
+
+import lombok.Data;
+
+/**
+ * Task类,表示一个任务对象
+ */
+@Data
+public class Task {
+
+    /**
+     * 文件路径,用于指定任务关联的文件位置
+     */
+    private String filePath;
+
+
+    /**
+     * 任务ID,用于存储任务ID
+     */
+    private String taskId;
+}

+ 60 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/model/TaskLogResponse.java

@@ -0,0 +1,60 @@
+
+package cn.com.yusys.manager.model;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+
+/**
+ * 任务日志响应实体
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class TaskLogResponse {
+    /**
+     * 响应码
+     */
+    private Integer code;
+
+    /**
+     * 响应消息
+     */
+    private String message;
+
+    /**
+     * 任务ID
+     */
+    private String taskId;
+
+    /**
+     * 日志内容列表
+     */
+    private List<String> logs;
+
+    /**
+     * 创建成功响应
+     */
+    public static TaskLogResponse success(String taskId, List<String> logs) {
+        return TaskLogResponse.builder()
+                .code(200)
+                .message("查询成功")
+                .taskId(taskId)
+                .logs(logs)
+                .build();
+    }
+
+    /**
+     * 创建失败响应
+     */
+    public static TaskLogResponse fail(String message) {
+        return TaskLogResponse.builder()
+                .code(500)
+                .message(message)
+                .build();
+    }
+}

+ 208 - 4
schedule-manager/src/main/java/cn/com/yusys/manager/service/InstanceMonitorService.java

@@ -2,11 +2,15 @@ package cn.com.yusys.manager.service;
 
 import cn.com.yusys.manager.common.ParseInstanceStatusRegistry;
 import cn.com.yusys.manager.config.ParserConfig;
+import cn.com.yusys.manager.model.ExecuteResponse;
 import cn.com.yusys.manager.model.InstanceStatus;
 import cn.com.yusys.manager.model.InstanceStatusResponse;
 import cn.com.yusys.manager.instanceManager.Impl.DockerInstanceManager;
+import cn.com.yusys.manager.model.Task;
+import cn.com.yusys.manager.model.InstanceManagementResponse;
+import cn.com.yusys.manager.model.InstanceConfigRequest;
 import cn.com.yusys.manager.util.ParseInstanceClient;
-import cn.com.yusys.manager.common.PortPool; 
+import cn.com.yusys.manager.common.PortPool;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.scheduling.annotation.Scheduled;
@@ -18,7 +22,6 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 /**
@@ -50,6 +53,14 @@ public class InstanceMonitorService {
     @Resource
     private PortPool portPool;
 
+    // 注入任务日志服务
+    @Resource
+    private TaskLogService taskLogService;
+
+    // 最大重试次数
+    @org.springframework.beans.factory.annotation.Value("${parser.task.max-retry:3}")
+    private int maxRetry;
+
     @PostConstruct
     public void initParseInstance(){
         log.info("开始初始化解析实例...");
@@ -77,9 +88,9 @@ public class InstanceMonitorService {
     /**
      * 核心监控定时任务:
      * - initialDelay = 30000:首次执行延迟30秒
-     * - fixedRate = 5000:之后每5秒执行一次
+     * - fixedRate = 10000:之后每10秒执行一次
      */
-    @Scheduled(initialDelay = 30000, fixedRate = 5000)
+    @Scheduled(initialDelay = 30000, fixedRate = 10000)
     public void parserInstanceMonitor() {
         Map<String, InstanceStatus> activeInstancePool = instancestatusRegistry.getActiveInstancePool();
         try {
@@ -387,4 +398,197 @@ public class InstanceMonitorService {
             }
         }
     }
+
+    /**
+     * 执行多模态任务解析任务,阻塞调用底层解析器执行任务
+     * 失败任务重试maxRetry次后,转入失败Topic
+     */
+   public ExecuteResponse processMultimodalTask(Task task) {
+        Map<String, InstanceStatus> activeInstancePool = instancestatusRegistry.getActiveInstancePool();
+        String taskId = task.getTaskId();
+        
+        try {
+            // 记录任务开始日志
+            taskLogService.logTaskStart(taskId, task.getFilePath());
+            
+            //  检查是否有空闲的解析实例
+            InstanceStatus idleInstance = findIdleInstance(activeInstancePool);
+            if (idleInstance == null) {
+                log.debug("当前无空闲解析实例");
+                taskLogService.logTaskFailure(taskId, "当前无空闲解析实例");
+                return ExecuteResponse.fail(300,"当前无空闲解析实例");
+            }
+            
+            // 记录实例分配日志
+            taskLogService.logInstanceAllocation(taskId, idleInstance.getContainerId());
+
+            //  执行任务解析
+            ExecuteResponse response = executeTaskWithRetry(idleInstance, task);
+            
+            // 记录任务完成日志
+            if (response != null && response.getCode() == 200) {
+                taskLogService.logTaskComplete(taskId, response.getMessage());
+            } else {
+                taskLogService.logTaskFailure(taskId, response != null ? response.getMessage() : "未知错误");
+            }
+            
+            return response;
+
+        } catch (Exception e) {
+            log.error("多模态任务解析定时任务执行失败", e);
+            taskLogService.logTaskFailure(taskId, e.getMessage());
+            return ExecuteResponse.fail("多模态任务解析定时任务执行失败");
+        }
+    }
+
+    /**
+     * 查找空闲的解析实例
+     */
+    private InstanceStatus findIdleInstance(Map<String, InstanceStatus> activeInstancePool) {
+        return activeInstancePool.values().stream()
+                .filter(status -> status.getStatus() == 0) // 状态为0表示空闲
+                .findFirst()
+                .orElse(null);
+    }
+
+
+    /**
+     * 执行任务并处理重试逻辑
+     */
+    private ExecuteResponse executeTaskWithRetry(InstanceStatus instance, Task task) {
+        String instanceId = instance.getContainerId();
+        int retryCount = 0;
+        // 标记实例为运行中
+        instance.setStatus(1);
+
+        try {
+            while (retryCount <= maxRetry ) {
+                try {
+                    log.info("开始执行任务,实例:{},重试次数:{}/{},任务内容:{}", 
+                            instanceId, retryCount, maxRetry, task.getFilePath());
+
+                    // 调用解析器执行任务
+                    ExecuteResponse response = callParser(instance, task);
+
+                    if (response != null && response.getCode() == 200) {
+                        log.info("任务执行成功,实例:{},响应:{}", instanceId, response);
+                        return response;
+                    } else {
+                        log.warn("任务执行返回失败,实例:{},响应:{},准备重试", instanceId, response);
+                        retryCount++;
+                    }
+                } catch (Exception e) {
+                    log.error("任务执行异常,实例:{},重试次数:{}/{}", instanceId, retryCount, maxRetry, e);
+                    retryCount++;
+                }
+
+            }
+
+            return ExecuteResponse.fail("任务执行失败,已达最大重试次数");
+        } finally {
+            // 恢复实例状态为空闲
+            instance.setStatus(0);
+        }
+    }
+
+    /**
+     * 调用解析器执行任务
+     */
+    private ExecuteResponse callParser(InstanceStatus instance, Task task) {
+        try {
+            ExecuteResponse response = instanceClient.executeTask(
+                    instance.getIp(), 
+                    instance.getPort(), 
+                    task);
+
+            if (response == null || response.getCode() != 200) {
+                log.warn("调用解析器返回失败,实例:{},响应:{}", instance.getContainerId(), response);
+            }
+            return response;
+        } catch (Exception e) {
+            log.error("调用解析器失败,实例:{}", instance.getContainerId(), e);
+            throw e;
+        }
+    }
+
+    /**
+     * 获取实例管理信息
+     * @return 实例管理信息
+     */
+    public InstanceManagementResponse getInstanceManagementInfo() {
+        try {
+            Map<String, InstanceStatus> activeInstancePool = instancestatusRegistry.getActiveInstancePool();
+            
+            // 统计各状态实例数量
+            long idleCount = activeInstancePool.values().stream()
+                    .filter(status -> status.getStatus() == 0)
+                    .count();
+            long runningCount = activeInstancePool.values().stream()
+                    .filter(status -> status.getStatus() == 1)
+                    .count();
+            long errorCount = activeInstancePool.values().stream()
+                    .filter(status -> status.getStatus() == 2)
+                    .count();
+            
+            // 构建实例详情列表
+            List<InstanceManagementResponse.InstanceDetail> instanceDetails = activeInstancePool.values().stream()
+                    .map(status -> InstanceManagementResponse.InstanceDetail.builder()
+                            .containerId(status.getContainerId())
+                            .ip(status.getIp())
+                            .port(status.getPort())
+                            .status(status.getStatus())
+                            .cpuUsage(status.getCpuUsage())
+                            .memoryUsage(status.getMemoryUsage())
+                            .gpuUsage(status.getGpuUsage())
+                            .gpuMemory(status.getGpuMemory())
+                            .lastHeartbeatTime(status.getLastHeartbeatTime())
+                            .build())
+                    .collect(Collectors.toList());
+            
+            // 构建响应数据
+            InstanceManagementResponse.InstanceManagementData data = InstanceManagementResponse.InstanceManagementData.builder()
+                    .totalInstances(activeInstancePool.size())
+                    .idleInstances((int) idleCount)
+                    .runningInstances((int) runningCount)
+                    .errorInstances((int) errorCount)
+                    .instanceDetails(instanceDetails)
+                    .build();
+            
+            return InstanceManagementResponse.success(data);
+        } catch (Exception e) {
+            log.error("获取实例管理信息失败", e);
+            return InstanceManagementResponse.fail("获取实例管理信息失败: " + e.getMessage());
+        }
+    }
+    
+    /**
+     * 更新实例配置
+     * @param request 实例配置请求
+     * @return 操作结果
+     */
+    public InstanceManagementResponse updateInstanceConfig(InstanceConfigRequest request) {
+        try {
+            int minActiveInstance = request.getMinActiveInstance();
+            int maxActiveInstance = request.getMaxActiveInstance();
+            
+            // 校验最小实例数不能大于最大实例数
+            if (minActiveInstance > maxActiveInstance) {
+                return InstanceManagementResponse.fail("最小活跃实例数不能大于最大活跃实例数");
+            }
+            
+            // 更新配置
+            parserConfig.MIN_ACTIVE_INSTANCE = minActiveInstance;
+            parserConfig.MAX_ACTIVE_INSTANCE = maxActiveInstance;
+            
+            log.info("实例配置已更新:最小活跃实例数={}, 最大活跃实例数={}", 
+                    minActiveInstance, maxActiveInstance);
+            
+            // 获取更新后的实例管理信息
+            return getInstanceManagementInfo();
+        } catch (Exception e) {
+            log.error("更新实例配置失败", e);
+            return InstanceManagementResponse.fail("更新实例配置失败: " + e.getMessage());
+        }
+    }
+
 }

+ 177 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/service/TaskLogService.java

@@ -0,0 +1,177 @@
+
+package cn.com.yusys.manager.service;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * 任务日志服务
+ * 负责管理任务的生命周期日志记录和查询
+ */
+@Slf4j
+@Service
+public class TaskLogService {
+
+    // 日志存储目录
+    @Value("${task.log.directory:./logs/tasks}")
+    private String logDirectory;
+
+    // 日志文件后缀
+    private static final String LOG_FILE_SUFFIX = ".log";
+
+    // 日期时间格式
+    private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
+
+    /**
+     * 记录任务日志
+     * @param taskId 任务ID
+     * @param stage 任务阶段
+     * @param message 日志消息
+     */
+    public void logTask(String taskId, String stage, String message) {
+        try {
+            // 确保日志目录存在
+            ensureLogDirectoryExists();
+
+            // 构建日志文件路径
+            String logFilePath = getLogFilePath(taskId);
+
+            // 构建日志内容
+            String logEntry = String.format("[%s] [%s] [%s] %s%n",
+                    LocalDateTime.now().format(DATE_TIME_FORMATTER),
+                    stage,
+                    taskId,
+                    message);
+
+            // 追加写入日志文件
+            appendToFile(logFilePath, logEntry);
+
+            log.info("任务日志已记录:taskId={}, stage={}, message={}", taskId, stage, message);
+        } catch (Exception e) {
+            log.error("记录任务日志失败:taskId={}, stage={}, message={}", taskId, stage, message, e);
+        }
+    }
+
+    /**
+     * 记录任务开始日志
+     * @param taskId 任务ID
+     * @param filePath 文件路径
+     */
+    public void logTaskStart(String taskId, String filePath) {
+        logTask(taskId, "TASK_START", String.format("任务开始执行,文件路径:%s", filePath));
+    }
+
+    /**
+     * 记录任务完成日志
+     * @param taskId 任务ID
+     * @param result 执行结果
+     */
+    public void logTaskComplete(String taskId, String result) {
+        logTask(taskId, "TASK_COMPLETE", String.format("任务执行完成,结果:%s", result));
+    }
+
+    /**
+     * 记录任务失败日志
+     * @param taskId 任务ID
+     * @param errorMessage 错误消息
+     */
+    public void logTaskFailure(String taskId, String errorMessage) {
+        logTask(taskId, "TASK_FAILURE", String.format("任务执行失败:%s", errorMessage));
+    }
+
+    /**
+     * 记录实例分配日志
+     * @param taskId 任务ID
+     * @param instanceId 实例ID
+     */
+    public void logInstanceAllocation(String taskId, String instanceId) {
+        logTask(taskId, "INSTANCE_ALLOCATION", String.format("分配解析实例:%s", instanceId));
+    }
+
+    /**
+     * 记录实例释放日志
+     * @param taskId 任务ID
+     * @param instanceId 实例ID
+     */
+    public void logInstanceRelease(String taskId, String instanceId) {
+        logTask(taskId, "INSTANCE_RELEASE", String.format("释放解析实例:%s", instanceId));
+    }
+
+    /**
+     * 记录解析器调用日志
+     * @param taskId 任务ID
+     * @param instanceId 实例ID
+     * @param message 消息
+     */
+    public void logParserCall(String taskId, String instanceId, String message) {
+        logTask(taskId, "PARSER_CALL", String.format("调用解析器,实例ID:%s,消息:%s", instanceId, message));
+    }
+
+    /**
+     * 查询任务日志
+     * @param taskId 任务ID
+     * @return 日志内容列表
+     */
+    public List<String> getTaskLogs(String taskId) {
+        try {
+            String logFilePath = getLogFilePath(taskId);
+            Path path = Paths.get(logFilePath);
+
+            if (!Files.exists(path)) {
+                log.warn("任务日志文件不存在:{}", logFilePath);
+                return new ArrayList<>();
+            }
+
+            return Files.readAllLines(path);
+        } catch (Exception e) {
+            log.error("查询任务日志失败:taskId={}", taskId, e);
+            return new ArrayList<>();
+        }
+    }
+
+    /**
+     * 获取任务日志文件路径
+     * @param taskId 任务ID
+     * @return 日志文件路径
+     */
+    private String getLogFilePath(String taskId) {
+        return logDirectory + File.separator + taskId + LOG_FILE_SUFFIX;
+    }
+
+    /**
+     * 确保日志目录存在
+     * @throws IOException 如果创建目录失败
+     */
+    private void ensureLogDirectoryExists() throws IOException {
+        Path path = Paths.get(logDirectory);
+        if (!Files.exists(path)) {
+            Files.createDirectories(path);
+            log.info("创建任务日志目录:{}", logDirectory);
+        }
+    }
+
+    /**
+     * 追加内容到文件
+     * @param filePath 文件路径
+     * @param content 内容
+     * @throws IOException 如果写入文件失败
+     */
+    private void appendToFile(String filePath, String content) throws IOException {
+        try (PrintWriter writer = new PrintWriter(new FileWriter(filePath, true))) {
+            writer.print(content);
+        }
+    }
+}

+ 53 - 0
schedule-manager/src/main/java/cn/com/yusys/manager/util/ParseInstanceClient.java

@@ -1,7 +1,9 @@
 package cn.com.yusys.manager.util;
 
 import cn.com.yusys.manager.config.ParserConfig;
+import cn.com.yusys.manager.model.ExecuteResponse;
 import cn.com.yusys.manager.model.InstanceStatusResponse;
+import cn.com.yusys.manager.model.Task;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.MediaType;
@@ -101,4 +103,55 @@ public class ParseInstanceClient {
 
         return errorResponse;
     }
+
+    /**
+     * 调用Python实例的/execute接口执行任务
+     * @param instanceIp 实例IP
+     * @param instancePort 实例端口
+     * @param taskData 任务数据
+     * @return 执行结果响应
+     */
+    public ExecuteResponse executeTask(
+            String instanceIp, Integer instancePort, Task taskData) {
+        // 1. 参数校验
+        if (instanceIp == null || instancePort == null) {
+            log.warn("实例IP或端口为空,跳过任务执行");
+            return ExecuteResponse.fail("实例IP或端口为空");
+        }
+
+        String executeUrl = String.format("http://%s:%d%s", instanceIp, instancePort,"/execute");
+        try {
+            ExecuteResponse response = webClient.post()
+                    .uri(executeUrl)
+                    .contentType(MediaType.APPLICATION_JSON)
+                    .bodyValue(taskData)
+                    .retrieve()
+                    .onStatus(HttpStatus::isError, clientResponse ->
+                            Mono.error(new WebClientResponseException(
+                                    "执行接口返回异常状态码",
+                                    clientResponse.statusCode().value(),
+                                    clientResponse.statusCode().getReasonPhrase(),
+                                    null, null, null)))
+                    .bodyToMono(ExecuteResponse.class)
+                    .timeout(java.time.Duration.ofSeconds(30))
+                    .block();
+
+            // 2. 响应校验
+            if (response != null && 200 == response.getCode()) {
+                log.debug("实例{}:{}任务执行成功", instanceIp, instancePort);
+                return response;
+            } else {
+                log.warn("实例{}:{}任务执行返回失败,响应:{}", instanceIp, instancePort, response);
+                return ExecuteResponse.fail("接口返回非200响应");
+            }
+        } catch (WebClientResponseException e) {
+            log.error("实例{}:{}任务执行失败,HTTP状态码:{}", instanceIp, instancePort, e.getRawStatusCode(), e);
+            return ExecuteResponse.fail(String.format("HTTP请求失败,状态码:%d", e.getRawStatusCode()));
+        } catch (Exception e) {
+            log.error("实例{}:{}任务执行异常", instanceIp, instancePort, e);
+            return ExecuteResponse.fail(e.getMessage());
+        }
+    }
+
+
 }

+ 3 - 1
schedule-manager/src/main/resources/application.yml

@@ -17,6 +17,8 @@ spring:
 kafka:
   topics:
     listen: test-topic
+    task: task-topic
+    failed: task-failed
 
 docker:
   host: tcp://127.0.0.1:2375 # Docker Daemon地址
@@ -34,7 +36,7 @@ parser:
     min-active-instance: 3
 
     # 最大活跃实例数,资源上限
-    max-active-instance: 10
+    max-active-instance: 4
 
     # 任务积压阈值,触发临时扩容的 Kafka Lag 阈值 (> 100)
     task-backlog-threshold: 100

+ 1 - 1
schedule-producer/src/main/resources/application.yml

@@ -12,7 +12,7 @@ spring:
       acks: all
       retries: 3
     topics:
-      test-topic:
+      schedule-topic:
         partitions: 6