Просмотр исходного кода

加入解析器代码至仓库

zsh 3 недель назад
Родитель
Сommit
c083ce536e

+ 1 - 0
.gitignore

@@ -5,6 +5,7 @@ target/
 !**/src/main/**/target/
 !**/src/test/**/target/
 .kotlin
+**.log
 
 ### IntelliJ IDEA ###
 .idea/modules.xml

+ 55 - 0
parser/.dockerignore

@@ -0,0 +1,55 @@
+# Git相关
+.git
+.gitignore
+.gitattributes
+
+# Python相关
+__pycache__
+*.pyc
+*.pyo
+*.pyd
+.Python
+*.egg-info/
+dist/
+build/
+*.egg
+.pytest_cache/
+.coverage
+htmlcov/
+
+# 虚拟环境
+venv/
+env/
+ENV/
+.venv
+
+# IDE相关
+.vscode/
+.idea/
+*.swp
+*.swo
+*~
+.DS_Store
+
+# 输出文件
+output/
+logs/
+*.log
+
+# 临时文件
+tmp/
+temp/
+*.tmp
+
+# 文档
+*.md
+doc/
+docs/
+
+# 开发工具
+.editorconfig
+.pre-commit-config.yaml
+Makefile
+
+# 不需要的目录
+.github/

+ 342 - 0
parser/core/router.py

@@ -0,0 +1,342 @@
+from typing import Optional, Type
+from abc import ABC, abstractmethod
+import fitz  # PyMuPDF
+from utils.mime_detector import MimeDetector
+from utils.logger import log
+from models.result import ParseResult
+
+
+class Parser(ABC):
+    """解析器基类"""
+    
+    @abstractmethod
+    async def parse(self, file_path: str) -> ParseResult:
+        """
+        解析文件
+        
+        Args:
+            file_path: 文件路径
+            
+        Returns:
+            ParseResult: 解析结果
+        """
+        pass
+    
+    def _get_file_size(self, file_path: str) -> int:
+        """
+        获取文件大小
+        
+        Args:
+            file_path: 文件路径
+            
+        Returns:
+            int: 文件大小(字节)
+        """
+        import os
+        try:
+            return os.path.getsize(file_path)
+        except Exception:
+            return 0
+
+
+class ParserFactory:
+    """解析器工厂类"""
+    
+    def __init__(self):
+        self.mime_detector = MimeDetector()
+        self.parsers = {}
+        self.parser_instances = {}  # 缓存解析器实例
+        # 统计信息
+        self.stats = {
+            'total_files': 0,
+            'total_size': 0,
+            'text_files': 0,
+            'text_size': 0,
+            'image_files': 0,
+            'image_size': 0,
+            'audio_files': 0,
+            'audio_size': 0,
+            'video_files': 0,
+            'video_size': 0,
+            'pdf_files': 0,
+            'pdf_size': 0,
+            'office_files': 0,
+            'office_size': 0,
+            'total_time': 0,
+            'successful_files': 0,
+            'failed_files': 0
+        }
+    
+    def register_parser(self, mime_type: str, parser_class: Type[Parser]):
+        """
+        注册解析器
+        
+        Args:
+            mime_type: MIME类型
+            parser_class: 解析器类
+        """
+        self.parsers[mime_type] = parser_class
+    
+    async def get_parser(self, file_path: str) -> Parser:
+        """
+        根据文件类型和内容特征获取合适的解析器
+        
+        Args:
+            file_path: 文件路径
+            
+        Returns:
+            Parser: 解析器实例
+        """
+        log.info(f"开始获取解析器,文件路径: {file_path}")
+        # 1. 检测文件MIME类型
+        mime_type = self.mime_detector.detect(file_path)
+        log.info(f"文件MIME类型: {mime_type}")
+        
+        # 2. 第一层路由:根据MIME类型分流
+        if mime_type.startswith("text/"):
+            log.info("检测到文本文件,使用TextParser")
+            if "TextParser" not in self.parser_instances:
+                from parsers.text_parser import TextParser
+                self.parser_instances["TextParser"] = TextParser()
+            return self.parser_instances["TextParser"]
+        elif mime_type.startswith("image/"):
+            log.info("检测到图片文件,使用VisualDocParser")
+            if "VisualDocParser" not in self.parser_instances:
+                from parsers.visual_parser import VisualDocParser
+                self.parser_instances["VisualDocParser"] = VisualDocParser()
+            return self.parser_instances["VisualDocParser"]
+        elif mime_type.startswith("audio/"):
+            log.info("检测到音频文件,使用AudioParser")
+            if "AudioParser" not in self.parser_instances:
+                from parsers.audio_parser import AudioParser
+                self.parser_instances["AudioParser"] = AudioParser()
+            return self.parser_instances["AudioParser"]
+        elif mime_type.startswith("video/"):
+            log.info("检测到视频文件,使用VideoParser")
+            if "VideoParser" not in self.parser_instances:
+                from parsers.video_parser import VideoParser
+                self.parser_instances["VideoParser"] = VideoParser()
+            return self.parser_instances["VideoParser"]
+        elif mime_type == "application/pdf":
+            # 3. 第二层路由:PDF特殊处理
+            log.info("检测到PDF文件,进入特殊路由")
+            return await self._route_pdf(file_path)
+        elif "openxmlformats" in mime_type or mime_type == "application/msword":
+            # Office文件处理(包括docx和doc)
+            log.info(f"检测到Office文件,MIME类型: {mime_type},使用NativeDocParser")
+            return await self._route_office(file_path, mime_type)
+        else:
+            log.error(f"不支持的文件类型: {mime_type}")
+            raise Exception(f"不支持的文件类型: {mime_type}")
+    
+    async def _route_pdf(self, file_path: str) -> Parser:
+        """
+        PDF文件路由逻辑
+        
+        Args:
+            file_path: 文件路径
+            
+        Returns:
+            Parser: 解析器实例
+        """
+        # 检测PDF是否为扫描件(文本密度检测)
+        if self._is_scanned_pdf(file_path):
+            log.info("PDF为扫描件,使用VisualDocParser")
+            if "VisualDocParser" not in self.parser_instances:
+                from parsers.visual_parser import VisualDocParser
+                self.parser_instances["VisualDocParser"] = VisualDocParser()
+            return self.parser_instances["VisualDocParser"]
+        else:
+            log.info("PDF为原生文档,使用NativeDocParser")
+            if "NativeDocParser" not in self.parser_instances:
+                from parsers.native_parser import NativeDocParser
+                self.parser_instances["NativeDocParser"] = NativeDocParser()
+            return self.parser_instances["NativeDocParser"]
+    
+    async def _route_office(self, file_path: str, mime_type: str) -> Parser:
+        """
+        Office文件路由逻辑
+        
+        Args:
+            file_path: 文件路径
+            mime_type: MIME类型
+            
+        Returns:
+            Parser: 解析器实例
+        """
+        if "NativeDocParser" not in self.parser_instances:
+            from parsers.native_parser import NativeDocParser
+            self.parser_instances["NativeDocParser"] = NativeDocParser()
+        return self.parser_instances["NativeDocParser"]
+    
+    def _is_scanned_pdf(self, file_path: str) -> bool:
+        """
+        检测PDF是否为扫描件
+        
+        Args:
+            file_path: 文件路径
+            
+        Returns:
+            bool: 是否为扫描件
+        """
+        try:
+            doc = fitz.open(file_path)
+            text_content = ""
+            # 提取前3页文本
+            for page_num in range(min(3, len(doc))):
+                page = doc[page_num]
+                text_content += page.get_text()
+            doc.close()
+            
+            # 计算有效字符数
+            valid_chars = len([c for c in text_content if c.isalnum() or c.isspace()])
+            log.info(f"PDF前3页有效字符数: {valid_chars}")
+            
+            # 如果有效字符数少于50,认为是扫描件
+            return valid_chars < 50
+        except Exception as e:
+            log.error(f"PDF文本提取失败: {str(e)}")
+            # 提取失败时默认使用VisualDocParser
+            return True
+    
+    async def parse(self, file_path: str) -> ParseResult:
+        """
+        解析文件的入口方法
+        
+        Args:
+            file_path: 文件路径
+            
+        Returns:
+            ParseResult: 解析结果
+        """
+        import time
+        import os
+        
+        start_time = time.time()
+        file_size = 0
+        
+        try:
+            file_size = os.path.getsize(file_path)
+        except Exception:
+            pass
+        
+        log.info(f"开始解析文件: {file_path}, 文件大小: {file_size / (1024 * 1024):.2f} MB")
+        
+        try:
+            parser = await self.get_parser(file_path)
+            log.info(f"获取到解析器: {parser.__class__.__name__}")
+            
+            result = await parser.parse(file_path)
+            
+            end_time = time.time()
+            elapsed_time = end_time - start_time
+            
+            # 更新统计信息
+            self.stats['total_files'] += 1
+            self.stats['total_size'] += file_size
+            self.stats['total_time'] += elapsed_time
+            self.stats['successful_files'] += 1
+            
+            # 根据文件类型更新统计
+            file_type = result.file_type
+            if file_type.startswith('text'):
+                self.stats['text_files'] += 1
+                self.stats['text_size'] += file_size
+            elif file_type.startswith('image') or file_type == 'visual':
+                self.stats['image_files'] += 1
+                self.stats['image_size'] += file_size
+            elif file_type.startswith('audio'):
+                self.stats['audio_files'] += 1
+                self.stats['audio_size'] += file_size
+            elif file_type.startswith('video'):
+                self.stats['video_files'] += 1
+                self.stats['video_size'] += file_size
+            elif file_type == 'pdf' or file_type == 'pdf_scanned':
+                self.stats['pdf_files'] += 1
+                self.stats['pdf_size'] += file_size
+            elif file_type == 'office':
+                self.stats['office_files'] += 1
+                self.stats['office_size'] += file_size
+            
+            # 解析结果日志
+            log.info(f"文件解析完成,耗时: {elapsed_time:.2f} 秒")
+            log.info(f"文件类型: {result.file_type}")
+            log.info(f"解析内容长度: {len(result.content)} 字符")
+            log.info(f"元数据: {result.metadata}")
+            if result.tables:
+                log.info(f"提取到表格数量: {len(result.tables)}")
+            
+            return result
+        except Exception as e:
+            end_time = time.time()
+            elapsed_time = end_time - start_time
+            
+            # 更新统计信息
+            self.stats['total_files'] += 1
+            self.stats['total_size'] += file_size
+            self.stats['total_time'] += elapsed_time
+            self.stats['failed_files'] += 1
+            
+            log.error(f"解析失败: {str(e)}, 耗时: {elapsed_time:.2f} 秒")
+            # 返回错误结果
+            return ParseResult(
+                content=f"解析失败: {str(e)}",
+                metadata={"error": str(e)},
+                file_type="error"
+            )
+    
+    def generate_performance_report(self) -> str:
+        """
+        生成性能报告
+        
+        Returns:
+            str: 性能报告
+        """
+        stats = self.stats
+        
+        # 计算各项指标
+        total_files = stats['total_files']
+        total_size = stats['total_size']
+        total_time = stats['total_time']
+        successful_files = stats['successful_files']
+        failed_files = stats['failed_files']
+        
+        # 计算各类文件占比
+        text_ratio = (stats['text_size'] / total_size * 100) if total_size > 0 else 0
+        image_ratio = (stats['image_size'] / total_size * 100) if total_size > 0 else 0
+        audio_ratio = (stats['audio_size'] / total_size * 100) if total_size > 0 else 0
+        video_ratio = (stats['video_size'] / total_size * 100) if total_size > 0 else 0
+        pdf_ratio = (stats['pdf_size'] / total_size * 100) if total_size > 0 else 0
+        office_ratio = (stats['office_size'] / total_size * 100) if total_size > 0 else 0
+        
+        # 计算解析速度
+        total_size_mb = total_size / (1024 * 1024)
+        avg_speed = (total_size_mb / total_time) if total_time > 0 else 0
+        
+        # 生成报告
+        report = f"""# 解析性能报告
+
+## 总体情况
+- 总解析文件数: {total_files}
+- 成功解析: {successful_files}
+- 解析失败: {failed_files}
+- 总文件大小: {total_size_mb:.2f} MB
+- 总耗时: {total_time:.2f} 秒
+- 平均解析速度: {avg_speed:.2f} MB/秒
+
+## 文件类型分布
+- 文本文件: {stats['text_files']} 个, {stats['text_size'] / (1024 * 1024):.2f} MB, 占比: {text_ratio:.2f}%
+- 图片文件: {stats['image_files']} 个, {stats['image_size'] / (1024 * 1024):.2f} MB, 占比: {image_ratio:.2f}%
+- 音频文件: {stats['audio_files']} 个, {stats['audio_size'] / (1024 * 1024):.2f} MB, 占比: {audio_ratio:.2f}%
+- 视频文件: {stats['video_files']} 个, {stats['video_size'] / (1024 * 1024):.2f} MB, 占比: {video_ratio:.2f}%
+- PDF文件: {stats['pdf_files']} 个, {stats['pdf_size'] / (1024 * 1024):.2f} MB, 占比: {pdf_ratio:.2f}%
+- Office文件: {stats['office_files']} 个, {stats['office_size'] / (1024 * 1024):.2f} MB, 占比: {office_ratio:.2f}%
+
+## 性能分析
+- 文本类平均解析速度: {(stats['text_size'] / (1024 * 1024) / total_time):.2f} MB/秒 (如果有文本文件)
+- 图片类平均解析速度: {(stats['image_size'] / (1024 * 1024) / total_time):.2f} MB/秒 (如果有图片文件)
+- 音频类平均解析速度: {(stats['audio_size'] / (1024 * 1024) / total_time):.2f} MB/秒 (如果有音频文件)
+- 视频类平均解析速度: {(stats['video_size'] / (1024 * 1024) / total_time):.2f} MB/秒 (如果有视频文件)
+"""
+        
+        return report

+ 38 - 26
parser/dockerfile

@@ -1,34 +1,46 @@
-# 基础镜像:使用Python 3.12-alpine
-FROM python:3.12-alpine
+# 基础镜像:使用Python官方镜像(更稳定)
+FROM python:3.11-slim
 
 # 设置工作目录
 WORKDIR /app
 
-
+# 环境变量
+ENV PYTHONIOENCODING=utf-8 \
+    PIP_DEFAULT_TIMEOUT=120 \
+    PIP_DISABLE_PIP_VERSION_CHECK=1 \
+    DEBIAN_FRONTEND=noninteractive \
+    PYTHONUNBUFFERED=1
+
+# ========== 系统依赖安装 ==========
+RUN sed -i 's|deb.debian.org|mirrors.ustc.edu.cn|g' /etc/apt/sources.list.d/debian.sources && \
+    apt-get update && \
+    apt-get install -y --no-install-recommends \
+    libglib2.0-0 \
+    libsm6 \
+    libxext6 \
+    libxrender-dev \
+    libgomp1 \
+    libgthread-2.0-0 \
+    libgtk-3-0 \
+    libgstreamer1.0-0 \
+    libgstreamer-plugins-base1.0-0 \
+    ffmpeg \
+    ca-certificates \
+    && rm -rf /var/lib/apt/lists/*
+
+# ========== 配置pip源(国内加速) ==========
+RUN pip config set global.index-url https://mirrors.aliyun.com/pypi/simple/ && \
+    pip config set global.trusted-host mirrors.aliyun.com
+
+# ========== 安装Python依赖(包括所有系统库的Python包装) ==========
 COPY requirements.txt .
+RUN pip install --no-cache-dir -r requirements.txt
+
+# ========== 复制应用代码 ==========
+COPY . .
 
-# 设置环境变量
-ENV PYTHONUNBUFFERED=1 \
-    PYTHONDONTWRITEBYTECODE=1 \
-    PIP_NO_CACHE_DIR=off \
-    PIP_DISABLE_PIP_VERSION_CHECK=on
-
-# 安装编译依赖并编译安装所有需要的包
-RUN apk add --no-cache \
-    gcc \
-    musl-dev \
-    linux-headers \
-    && pip install -i https://mirrors.aliyun.com/pypi/simple/ psutil==5.9.5 \
-    # 注意:在这里安装所有需要编译的依赖
-    && pip install -i https://mirrors.aliyun.com/pypi/simple/ -r requirements.txt \
-    # 最后清理编译工具
-    && apk del gcc musl-dev linux-headers
-
-# 复制服务代码
-COPY parse_service.py .
-
-# 暴露服务端口
+# ========== 暴露端口 ==========
 EXPOSE 8000
 
-# 启动命令
-CMD ["python", "parse_service.py", "--host", "0.0.0.0", "--port", "8000"]
+# ========== 启动应用 ==========
+CMD ["python", "-m", "uvicorn", "parse_service:app", "--host", "0.0.0.0", "--port", "8000"]

BIN
parser/examples/test1.pdf


+ 25 - 0
parser/models/result.py

@@ -0,0 +1,25 @@
+from dataclasses import dataclass, field
+from typing import Dict, List, Optional
+
+
+@dataclass
+class ParseResult:
+    """解析结果的统一输出结构"""
+    content: str = ""  # 解析出的 Markdown 文本
+    metadata: Dict[str, any] = field(default_factory=dict)  # 页数、作者、时长等元数据
+    file_type: str = ""  # 识别出的具体类型
+    tables: List[Dict] = field(default_factory=list)  # 提取出的结构化表格数据
+
+    def to_dict(self) -> Dict[str, any]:
+        """转换为字典格式"""
+        return {
+            "content": self.content,
+            "metadata": self.metadata,
+            "file_type": self.file_type,
+            "tables": self.tables
+        }
+
+    def to_json(self) -> str:
+        """转换为JSON字符串"""
+        import json
+        return json.dumps(self.to_dict(), ensure_ascii=False, indent=2)

+ 21 - 81
parser/parse_service.py

@@ -5,8 +5,9 @@ import argparse
 from fastapi import FastAPI, BackgroundTasks
 from pydantic import BaseModel
 from typing import Dict, Optional, List
-import psutil
 import os
+from core.router import ParserFactory
+
 
 # 初始化FastAPI应用
 app = FastAPI(title="Python解析服务", version="1.0")
@@ -22,114 +23,53 @@ service_status = {
 # 锁机制,保证多线程安全
 task_lock = threading.Lock()
 
-# 任务请求模型
-class ParseTask(BaseModel):
-    task_id: str
-    file_path: str
-    parse_params: Optional[Dict] = None  # 解析参数,如解析类型、阈值等
-
-# ------------------------------
-# 核心解析逻辑(模拟真实解析器调用)
-# ------------------------------
-def parse_task_worker(task_id: str, file_path: str, parse_params: Dict):
-    """
-    后台解析任务执行函数
-    """
-    try:
-        with task_lock:
-            task_status[task_id] = {
-                "status": "running",
-                "progress": 0,
-                "result": None,
-                "error": None
-            }
-        
-        # 模拟解析过程(实际场景替换为真实解析器调用)
-        total_steps = 10
-        for step in range(total_steps):
-            time.sleep(1)  # 模拟解析耗时
-            progress = (step + 1) * 10
-            with task_lock:
-                task_status[task_id]["progress"] = progress
-        
-        # 解析完成,模拟返回结果
-        with task_lock:
-            task_status[task_id]["status"] = "completed"
-            task_status[task_id]["result"] = {
-                "file_path": file_path,
-                "parse_result": "解析成功(模拟结果)",
-                "parse_params": parse_params,
-                "finish_time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
-            }
-    
-    except Exception as e:
-        with task_lock:
-            task_status[task_id]["status"] = "failed"
-            task_status[task_id]["error"] = str(e)
 
 # ------------------------------
 # 接口定义
 # ------------------------------
 @app.post("/execute", summary="接收解析任务并执行")
-async def execute_task(task: ParseTask, background_tasks: BackgroundTasks):
+async def execute_task(file_path: str):
     """
     接收Java端下发的解析任务,后台异步执行
     """
-    # 检查任务ID是否已存在
-    if task.task_id in task_status:
-        return {
-            "code": 400,
-            "msg": f"任务ID {task.task_id} 已存在",
-            "data": None
-        }
+
+    factory = ParserFactory()
     
-    # 提交后台任务执行
-    background_tasks.add_task(parse_task_worker, task.task_id, task.file_path, task.parse_params)
+    # 解析文件
+    result = await factory.parse(file_path)
     
-    return {
-        "code": 200,
-        "msg": "任务接收成功,已开始执行",
-        "data": {"task_id": task.task_id}
-    }
-
-@app.get("/status/{task_id}", summary="查询指定任务状态")
-async def get_task_status(task_id: str):
-    """
-    查询单个任务的执行状态、进度、结果
-    """
-    if task_id not in task_status:
-        return {
-            "code": 404,
-            "msg": f"任务ID {task_id} 不存在",
-            "data": None
-        }
     
+    print(result.content)
+    
+    # 生成并显示性能报告
+    report = factory.generate_performance_report()
+    print("\n" + "="*80)
+    print(report)
+    print("="*80)
+    
+
     return {
         "code": 200,
-        "msg": "查询成功",
-        "data": task_status[task_id]
+        "msg": "任务执行成功",
+        "data": result.content
     }
 
-
 @app.get("/status", summary="状态接口")
 async def health_check():
     """
     返回实例健康状态、资源使用情况
     """
-    cpu_usage = psutil.cpu_percent(interval=1)
     
-    # 内存信息
-    memory = psutil.virtual_memory()
-    memory_usage = memory.percent
+    
     
     return {
     "code": 200,
     "msg": "success",
     "data": {
         "status": 0,
-        "cpu_usage": cpu_usage,
+        "cpu_usage": 0.2,
         "gpu_usage": 0.0,
-        "memory_usage": memory_usage,
+        "memory_usage": 0.2,
         "gpu_memory": 0.0
     }
 }

+ 137 - 0
parser/parsers/audio_parser.py

@@ -0,0 +1,137 @@
+from core.router import Parser
+from models.result import ParseResult
+from utils.logger import log
+from utils.ffmpeg_wrapper import FFmpegWrapper
+import os
+import tempfile
+import requests
+
+
+class AudioParser(Parser):
+    """音频文件解析器"""
+    
+    def __init__(self):
+        self.ffmpeg = FFmpegWrapper()
+        # Qwen3-ASR模型配置 - 使用专门的音频转录端点
+        self.qwen_asr_api_url = "http://10.192.72.13:7283/v1/audio/transcriptions"
+        log.info("音频解析器初始化完成,使用本地部署的Qwen3-ASR模型")
+    
+    async def parse(self, file_path: str) -> ParseResult:
+        """
+        解析音频文件
+        
+        Args:
+            file_path: 文件路径
+            
+        Returns:
+            ParseResult: 解析结果
+        """
+        log.info(f"开始解析音频文件: {file_path}")
+        temp_wav_path = None
+        try:
+            # 1. 预处理:转换为16k/16bit/mono wav
+            with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as temp_file:
+                temp_wav_path = temp_file.name
+            
+            log.info(f"创建临时文件: {temp_wav_path}")
+            self.ffmpeg.convert_audio(file_path, temp_wav_path)
+            log.info(f"音频转换完成: {temp_wav_path}")
+            
+            # 2. 使用Qwen3-ASR进行语音识别
+            log.info("开始使用Qwen3-ASR进行语音识别...")
+            log.info(f"使用的API地址: {self.qwen_asr_api_url}")
+            
+            content = []
+            result = None
+            try:
+                # 检查文件大小
+                file_size = os.path.getsize(temp_wav_path)
+                log.info(f"音频文件大小: {file_size / (1024 * 1024):.2f} MB")
+                
+                # 使用专门的音频转录端点,支持文件上传
+                log.info("准备使用文件上传方式进行语音识别...")
+                
+                # 构建请求数据
+                session = requests.Session()
+                session.trust_env = False  # 禁用环境变量中的代理设置
+                
+                # 使用multipart/form-data上传文件
+                files = {
+                    'file': ('audio.wav', open(temp_wav_path, 'rb'), 'audio/wav')
+                }
+                data = {
+                    'model': '/data/shared/Qwen3-ASR/qwen/Qwen3-ASR-1.7B',
+                    'language': 'zh',
+                    'response_format': 'json'
+                }
+                
+                # 发送请求
+                log.info("开始发送请求...")
+                # 增加超时时间,音频处理可能需要更长时间
+                response = session.post(self.qwen_asr_api_url, files=files, data=data, timeout=600)
+                log.info(f"请求完成,状态码: {response.status_code}")
+                
+                # 打印响应内容以进行调试
+                if response.status_code != 200:
+                    log.warning(f"响应内容: {response.text}")
+                
+                response.raise_for_status()
+                result = response.json()
+                
+                log.info("Qwen3-ASR语音识别完成")
+                log.info(f"识别结果: {result}")
+                
+                # 3. 构建解析结果
+                if result and 'text' in result:
+                    full_text = result['text']
+                    # 清理识别结果中的标记
+                    clean_text = full_text.replace('language Chinese<asr_text>', '').strip()
+                    content.append(f"完整文本: {clean_text}")
+                else:
+                    log.warning("解析失败:未获取到有效结果")
+                    content.append("解析失败:未获取到有效结果")
+                
+                log.info(f"构建完成,内容长度: {len(content)}")
+            except requests.exceptions.Timeout as e:
+                log.error(f"Qwen3-ASR语音识别超时: {str(e)}")
+                content.append("语音识别超时,请检查服务是否正常运行")
+            except requests.exceptions.ConnectionError as e:
+                log.error(f"Qwen3-ASR语音识别连接错误: {str(e)}")
+                content.append("语音识别连接错误,请检查服务地址是否正确")
+            except Exception as e:
+                log.error(f"Qwen3-ASR语音识别失败: {str(e)}")
+                import traceback
+                log.error(f"异常堆栈: {traceback.format_exc()}")
+                # 即使失败,也尝试返回一个基本结果
+                content.append("语音识别失败,但文件已成功处理")
+            
+            # 清理临时文件
+            if temp_wav_path and os.path.exists(temp_wav_path):
+                os.remove(temp_wav_path)
+                log.info(f"临时文件已清理: {temp_wav_path}")
+            
+            return ParseResult(
+                content="\n".join(content),
+                metadata={
+                    "parser": "Qwen3-ASR",
+                    "file_size": os.path.getsize(file_path),
+                    "api_url": self.qwen_asr_api_url
+                },
+                file_type="audio"
+            )
+        except Exception as e:
+            log.error(f"音频文件解析失败: {str(e)}")
+            import traceback
+            log.error(f"异常堆栈: {traceback.format_exc()}")
+            # 清理临时文件
+            if temp_wav_path and os.path.exists(temp_wav_path):
+                try:
+                    os.remove(temp_wav_path)
+                    log.info(f"临时文件已清理: {temp_wav_path}")
+                except:
+                    pass
+            return ParseResult(
+                content="",
+                metadata={"error": str(e)},
+                file_type="audio"
+            )

+ 282 - 0
parser/parsers/native_parser.py

@@ -0,0 +1,282 @@
+from core.router import Parser
+from models.result import ParseResult
+from utils.logger import log
+import fitz  # PyMuPDF
+from docx import Document
+import openpyxl
+from pptx import Presentation
+import os
+
+
+class NativeDocParser(Parser):
+    """原生文档解析器,处理Office文档和原生PDF"""
+    
+    async def parse(self, file_path: str) -> ParseResult:
+        """
+        解析原生文档
+        
+        Args:
+            file_path: 文件路径
+            
+        Returns:
+            ParseResult: 解析结果
+        """
+        log.info(f"开始解析原生文档: {file_path}")
+        try:
+            # 根据文件扩展名判断文件类型
+            ext = os.path.splitext(file_path)[1].lower()
+            
+            if ext == '.pdf':
+                return await self._parse_pdf(file_path)
+            elif ext == '.docx':
+                return await self._parse_docx(file_path)
+            elif ext == '.doc':
+                return await self._parse_doc(file_path)
+            elif ext == '.xlsx':
+                return await self._parse_xlsx(file_path)
+            elif ext == '.pptx':
+                return await self._parse_pptx(file_path)
+            else:
+                raise Exception(f"不支持的文件扩展名: {ext}")
+        except Exception as e:
+            log.error(f"原生文档解析失败: {str(e)}")
+            return ParseResult(
+                content="",
+                metadata={"error": str(e)},
+                file_type="unknown"
+            )
+    
+    async def _parse_pdf(self, file_path: str) -> ParseResult:
+        """
+        解析PDF文件
+        
+        Args:
+            file_path: 文件路径
+            
+        Returns:
+            ParseResult: 解析结果
+        """
+        doc = fitz.open(file_path)
+        content = []
+        tables = []
+        page_count = len(doc)
+        
+        # 遍历所有页面
+        for page_num in range(page_count):
+            page = doc[page_num]
+            # 提取文本
+            text = page.get_text()
+            content.append(f"# 第{page_num + 1}页\n{text}")
+            
+            # 提取表格(PyMuPDF的表格提取功能有限)
+            # 这里可以根据需要使用更高级的表格提取库
+        
+        doc.close()
+        
+        return ParseResult(
+            content="\n\n".join(content),
+            metadata={
+                "page_count": page_count,
+                "file_size": os.path.getsize(file_path)
+            },
+            file_type="pdf",
+            tables=tables
+        )
+    
+    async def _parse_docx(self, file_path: str) -> ParseResult:
+        """
+        解析Word文档
+        
+        Args:
+            file_path: 文件路径
+            
+        Returns:
+            ParseResult: 解析结果
+        """
+        doc = Document(file_path)
+        content = []
+        tables = []
+        
+        # 提取标题和正文
+        for para in doc.paragraphs:
+            if para.style.name.startswith('Heading'):
+                # 根据标题级别添加Markdown标题
+                level = int(para.style.name.split(' ')[1])
+                content.append(f"{'#' * level} {para.text}")
+            else:
+                content.append(para.text)
+        
+        # 提取表格
+        for table_idx, table in enumerate(doc.tables):
+            table_content = []
+            table_data = []
+            
+            # 提取表头
+            header_cells = table.rows[0].cells
+            header = [cell.text.strip() for cell in header_cells]
+            table_content.append('| ' + ' | '.join(header) + ' |')
+            table_content.append('| ' + ' | '.join(['---'] * len(header)) + ' |')
+            table_data.append(header)
+            
+            # 提取表格内容
+            for row in table.rows[1:]:
+                cells = row.cells
+                row_data = [cell.text.strip() for cell in cells]
+                table_content.append('| ' + ' | '.join(row_data) + ' |')
+                table_data.append(row_data)
+            
+            content.append('\n'.join(table_content))
+            tables.append({
+                "table_id": table_idx,
+                "data": table_data
+            })
+        
+        return ParseResult(
+            content="\n".join(content),
+            metadata={
+                "paragraph_count": len(doc.paragraphs),
+                "table_count": len(doc.tables),
+                "file_size": os.path.getsize(file_path)
+            },
+            file_type="docx",
+            tables=tables
+        )
+    
+    async def _parse_xlsx(self, file_path: str) -> ParseResult:
+        """
+        解析Excel文档
+        
+        Args:
+            file_path: 文件路径
+            
+        Returns:
+            ParseResult: 解析结果
+        """
+        wb = openpyxl.load_workbook(file_path)
+        content = []
+        tables = []
+        
+        # 遍历所有工作表
+        for sheet_idx, sheet_name in enumerate(wb.sheetnames):
+            sheet = wb[sheet_name]
+            content.append(f"# 工作表: {sheet_name}")
+            
+            # 提取表格数据
+            table_data = []
+            max_row = sheet.max_row
+            max_col = sheet.max_column
+            
+            # 提取表头
+            header = []
+            for col in range(1, max_col + 1):
+                cell_value = sheet.cell(row=1, column=col).value
+                header.append(str(cell_value) if cell_value else '')
+            table_data.append(header)
+            
+            # 提取表格内容
+            for row in range(2, max_row + 1):
+                row_data = []
+                for col in range(1, max_col + 1):
+                    cell_value = sheet.cell(row=row, column=col).value
+                    row_data.append(str(cell_value) if cell_value else '')
+                table_data.append(row_data)
+            
+            # 转换为Markdown表格
+            if header:
+                markdown_table = []
+                markdown_table.append('| ' + ' | '.join(header) + ' |')
+                markdown_table.append('| ' + ' | '.join(['---'] * len(header)) + ' |')
+                for row_data in table_data[1:]:
+                    markdown_table.append('| ' + ' | '.join(row_data) + ' |')
+                content.append('\n'.join(markdown_table))
+            
+            tables.append({
+                "sheet_name": sheet_name,
+                "data": table_data
+            })
+        
+        wb.close()
+        
+        return ParseResult(
+            content="\n\n".join(content),
+            metadata={
+                "sheet_count": len(wb.sheetnames),
+                "file_size": os.path.getsize(file_path)
+            },
+            file_type="xlsx",
+            tables=tables
+        )
+    
+    async def _parse_pptx(self, file_path: str) -> ParseResult:
+        """
+        解析PPT文档
+        
+        Args:
+            file_path: 文件路径
+            
+        Returns:
+            ParseResult: 解析结果
+        """
+        prs = Presentation(file_path)
+        content = []
+        
+        # 遍历所有幻灯片
+        for slide_idx, slide in enumerate(prs.slides):
+            content.append(f"# 幻灯片 {slide_idx + 1}")
+            
+            # 提取标题
+            for shape in slide.shapes:
+                if hasattr(shape, 'text_frame') and shape.text_frame.text:
+                    if shape == slide.shapes[0]:  # 假设第一个形状是标题
+                        content.append(f"## {shape.text_frame.text}")
+                    else:
+                        content.append(shape.text_frame.text)
+            
+            # 提取备注
+            if slide.notes_slide:
+                notes = slide.notes_slide.notes_text_frame.text
+                if notes:
+                    content.append(f"### 备注\n{notes}")
+        
+        return ParseResult(
+            content="\n\n".join(content),
+            metadata={
+                "slide_count": len(prs.slides),
+                "file_size": os.path.getsize(file_path)
+            },
+            file_type="pptx"
+        )
+    
+    async def _parse_doc(self, file_path: str) -> ParseResult:
+        """
+        解析.doc文件
+        
+        Args:
+            file_path: 文件路径
+            
+        Returns:
+            ParseResult: 解析结果
+        """
+        # 使用antiword提取.doc文件内容
+        import subprocess
+        try:
+            result = subprocess.run(
+                ['antiword', file_path],
+                capture_output=True,
+                text=True,
+                check=True
+            )
+            text = result.stdout
+        except Exception as e:
+            log.error(f"antiword解析失败: {str(e)}")
+            raise Exception(f"antiword解析失败: {str(e)}")
+        
+        content = [text]
+        
+        return ParseResult(
+            content="\n".join(content),
+            metadata={
+                "file_size": os.path.getsize(file_path)
+            },
+            file_type="doc"
+        )

+ 43 - 0
parser/parsers/text_parser.py

@@ -0,0 +1,43 @@
+from core.router import Parser
+from models.result import ParseResult
+from utils.logger import log
+
+
+class TextParser(Parser):
+    """文本文件解析器"""
+    
+    async def parse(self, file_path: str) -> ParseResult:
+        """
+        解析文本文件
+        
+        Args:
+            file_path: 文件路径
+            
+        Returns:
+            ParseResult: 解析结果
+        """
+        log.info(f"开始解析文本文件: {file_path}")
+        try:
+            # 读取文本文件内容
+            with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
+                content = f.read()
+            
+            # 构建解析结果
+            result = ParseResult(
+                content=content,
+                metadata={
+                    "file_size": len(content),
+                    "line_count": len(content.split('\n'))
+                },
+                file_type="text"
+            )
+            
+            log.info(f"文本文件解析完成,大小: {len(content)} 字符")
+            return result
+        except Exception as e:
+            log.error(f"文本文件解析失败: {str(e)}")
+            return ParseResult(
+                content="",
+                metadata={"error": str(e)},
+                file_type="text"
+            )

+ 151 - 0
parser/parsers/video_parser.py

@@ -0,0 +1,151 @@
+from core.router import Parser
+from models.result import ParseResult
+from utils.logger import log
+from utils.ffmpeg_wrapper import FFmpegWrapper
+import os
+import tempfile
+import base64
+import requests
+from parsers.audio_parser import AudioParser
+
+
+class VideoParser(Parser):
+    """视频文件解析器"""
+    
+    def __init__(self):
+        self.ffmpeg = FFmpegWrapper()
+        self.audio_parser = AudioParser()
+        # Qwen3-VL模型配置
+        self.qwen_api_url = "http://10.192.72.13:7280/v1/chat/completions"
+    
+    async def parse(self, file_path: str) -> ParseResult:
+        """
+        解析视频文件
+        
+        Args:
+            file_path: 文件路径
+            
+        Returns:
+            ParseResult: 解析结果
+        """
+        log.info(f"开始解析视频文件: {file_path}")
+        try:
+            # 1. 提取音频轨道
+            with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as temp_file:
+                temp_audio_path = temp_file.name
+            
+            self.ffmpeg.extract_audio(file_path, temp_audio_path)
+            log.info(f"音频提取完成: {temp_audio_path}")
+            
+            # 2. 使用AudioParser解析音频
+            audio_result = await self.audio_parser.parse(temp_audio_path)
+            log.info("音频解析完成")
+            
+            # 3. 提取关键帧
+            frame_results = []  # 移到外部定义
+            with tempfile.TemporaryDirectory() as temp_dir:
+                # 使用固定频率先提取帧,再通过帧差法筛选关键帧
+                interval_seconds = 10
+                diff_threshold = 15.0
+                keyframes = self.ffmpeg.extract_keyframes(file_path, temp_dir, interval=interval_seconds, diff_threshold=diff_threshold)
+                log.info(f"关键帧提取完成(帧差阈值={diff_threshold}),共{len(keyframes)}张")
+
+                # 4. 使用Qwen3-VL解析关键帧;根据帧文件名计算时间点
+                for idx, frame_path in enumerate(keyframes):
+                    try:
+                        frame_content = self._parse_frame_with_qwen(frame_path)
+                        log.info(f"解析关键帧 {idx+1} 结果长度: {len(frame_content) if frame_content else 0}")
+                        if frame_content:
+                            # 从文件名解析帧序号,filename like frame_000001.jpg
+                            try:
+                                base = os.path.basename(frame_path)
+                                num_part = base.split('_')[1].split('.')[0]
+                                frame_index = int(num_part)
+                                time_second = (frame_index - 1) * interval_seconds
+                            except Exception:
+                                time_second = idx * interval_seconds
+
+                            frame_results.append((time_second, frame_content))
+                            log.info(f"添加关键帧 到结果列表,时间:{time_second}s")
+                        else:
+                            log.warning(f"关键帧 {idx+1} 解析结果为空")
+                    except Exception as e:
+                        log.warning(f"解析关键帧 {idx+1} 失败: {str(e)}")
+            
+            log.info(f"关键帧解析完成,frame_results长度: {len(frame_results)}")
+            
+            # 5. 合并结果
+            content = []
+            content.append("# 音频内容")
+            content.append(audio_result.content)
+            
+            if frame_results:
+                log.info("开始添加画面内容到结果")
+                content.append("\n# 画面内容")
+                for time_second, frame_content in frame_results:
+                    content.append(f"\n## 第{time_second}秒")
+                    content.append(frame_content)
+                    log.info(f"添加第{time_second}秒画面内容,长度: {len(frame_content)}")
+            else:
+                log.warning("没有画面内容可以添加")
+            
+            # 清理临时文件
+            if os.path.exists(temp_audio_path):
+                os.remove(temp_audio_path)
+            
+            return ParseResult(
+                content="\n".join(content),
+                metadata={
+                    "parser": "VideoParser",
+                    "file_size": os.path.getsize(file_path),
+                    "audio_parser": "Qwen3-ASR",
+                    "visual_parser": "Qwen3-VL",
+                    "keyframe_count": len(keyframes)
+                },
+                file_type="video"
+            )
+        except Exception as e:
+            log.error(f"视频文件解析失败: {str(e)}")
+            # 清理临时文件
+            if 'temp_audio_path' in locals() and os.path.exists(temp_audio_path):
+                os.remove(temp_audio_path)
+            return ParseResult(
+                content="",
+                metadata={"error": str(e)},
+                file_type="video"
+            )
+    
+    def _parse_frame_with_qwen(self, image_path: str) -> str:
+        """
+        使用Qwen3-VL模型解析图片
+        
+        Args:
+            image_path: 图片路径
+            
+        Returns:
+            str: 解析结果
+        """
+        log.info(f"使用Qwen3-VL解析图片: {image_path}")
+        
+        # 编码图片
+        with open(image_path, "rb") as f:
+            base64_image = base64.b64encode(f.read()).decode("utf-8")
+
+        # 发送请求
+        payload = {
+            "model": "/model",
+            "messages": [{
+                "role": "user",
+                "content": [
+                    {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}},
+                    {"type": "text", "text": "详细描述这张图片的内容,包括人物、物体、场景、文字等所有可见信息"}
+                ]
+            }],
+            "max_tokens": 512
+        }
+
+        response = requests.post(self.qwen_api_url, json=payload, timeout=120)
+        response.raise_for_status()
+        result = response.json()
+        
+        return result["choices"][0]["message"]["content"]

+ 336 - 0
parser/parsers/visual_parser.py

@@ -0,0 +1,336 @@
+from core.router import Parser
+from models.result import ParseResult
+from utils.logger import log
+import os
+import time
+import httpx
+import json
+import io
+import zipfile
+from pathlib import Path
+import asyncio
+
+# 延迟导入PaddleOCR,避免模块级初始化
+
+
+class VisualDocParser(Parser):
+    """视觉文档解析器,处理图片和扫描件PDF"""
+    
+    def __init__(self):
+        # MinerU API配置 - 使用本地部署的服务
+        self.mineru_api_key = ""
+        self.base_url = "http://10.192.72.13:7284"
+        self.model_version = "hybrid-auto-engine"
+        self.poll_interval_sec = 3.0
+        self.max_wait_sec = 300.0
+        log.info("VisualDocParser初始化完成,使用本地部署的MinerU服务")
+    
+    async def parse(self, file_path: str) -> ParseResult:
+        """
+        解析视觉文档
+        
+        Args:
+            file_path: 文件路径
+            
+        Returns:
+            ParseResult: 解析结果
+        """
+        log.info(f"开始解析视觉文档: {file_path}")
+        try:
+            # 只使用MinerU API,避免PaddleOCR的初始化问题
+            result = await self._try_mineru(file_path)
+            if result:
+                return result
+            
+            # MinerU失败时,返回错误信息
+            return ParseResult(
+                content="",
+                metadata={"error": "MinerU API解析失败"},
+                file_type="visual"
+            )
+        except Exception as e:
+            log.error(f"视觉文档解析失败: {str(e)}")
+            return ParseResult(
+                content="",
+                metadata={"error": str(e)},
+                file_type="visual"
+            )
+    
+    async def _try_mineru(self, file_path: str) -> ParseResult:
+        """
+        尝试使用本地MinerU API解析
+        
+        Args:
+            file_path: 文件路径
+            
+        Returns:
+            ParseResult: 解析结果,如果失败返回None
+        """
+        try:
+            log.info("开始使用本地MinerU API解析文件")
+
+            file_path_obj = Path(file_path)
+            if not file_path_obj.exists():
+                raise FileNotFoundError(str(file_path))
+
+            log.info(f"Calling local MinerU for file: {file_path}")
+
+            # 直接使用本地API上传文件并获取结果
+            result = await self._upload_and_parse(file_path_obj)
+            
+            # 提取文本内容
+            text_content = self._extract_text_from_local_result(result)
+            
+            return ParseResult(
+                content=text_content,
+                metadata={
+                    "parser": "Local MinerU API",
+                    "file_size": file_path_obj.stat().st_size,
+                    "backend": self.model_version
+                },
+                file_type="visual"
+            )
+            
+        except Exception as e:
+            log.warning(f"本地MinerU API解析失败: {str(e)}")
+            return None
+    
+    async def _upload_and_parse(self, file_path: Path) -> dict:
+        """
+        上传文件并解析
+        
+        Args:
+            file_path: 文件路径
+            
+        Returns:
+            dict: 解析结果
+        """
+        url = f"{self.base_url}/file_parse"
+        
+        # 准备表单数据
+        files = {
+            "files": (file_path.name, open(file_path, 'rb'))
+        }
+        
+        # 准备参数
+        data = {
+            "backend": self.model_version,
+            "lang_list": ["ch"],
+            "return_md": True,
+            "formula_enable": True,
+            "table_enable": True
+        }
+
+        async with httpx.AsyncClient(timeout=300) as client:
+            resp = await client.post(url, files=files, data=data)
+            resp.raise_for_status()
+            result = resp.json()
+
+        return result
+
+    def _extract_text_from_local_result(self, result: dict) -> str:
+        """
+        从本地MinerU返回的结果中提取文本内容
+        
+        Args:
+            result: 本地MinerU返回的结果
+            
+        Returns:
+            str: 提取的文本内容
+        """
+        # 处理不同可能的返回结构
+        text_parts = []
+        
+        if isinstance(result, dict):
+            # 检查是否有results字段(新的返回结构)
+            if "results" in result:
+                results = result["results"]
+                if isinstance(results, dict):
+                    for key, value in results.items():
+                        if isinstance(value, dict):
+                            # 检查是否有md_content字段
+                            if "md_content" in value:
+                                text_parts.append(str(value["md_content"]))
+                            # 检查是否有text字段
+                            elif "text" in value:
+                                text_parts.append(str(value["text"]))
+            # 检查是否有markdown内容
+            elif "markdown" in result:
+                text_parts.append(str(result["markdown"]))
+            # 检查是否有text字段
+            elif "text" in result:
+                text_parts.append(str(result["text"]))
+            # 检查是否有content字段
+            elif "content" in result:
+                if isinstance(result["content"], str):
+                    text_parts.append(result["content"])
+                elif isinstance(result["content"], list):
+                    for item in result["content"]:
+                        if isinstance(item, dict) and "text" in item:
+                            text_parts.append(str(item["text"]))
+                        elif isinstance(item, str):
+                            text_parts.append(item)
+        
+        return "\n\n".join(text_parts)
+    
+    def _safe_stem(self, stem: str) -> str:
+        """
+        创建安全的缓存键
+        
+        Args:
+            stem: 文件stem
+            
+        Returns:
+            str: 安全的缓存键
+        """
+        import re
+        return re.sub(r'[^a-zA-Z0-9_-]', '_', stem)
+    
+    def _extract_text_from_payload(self, payload: dict) -> str:
+        """
+        从MinerU返回的payload中提取文本内容
+        
+        Args:
+            payload: MinerU返回的payload
+            
+        Returns:
+            str: 提取的文本内容
+        """
+        # 根据MinerU API返回的结构提取文本
+        text_parts = []
+        
+        # 处理不同可能的返回结构
+        if isinstance(payload, dict):
+            # 检查是否有text字段
+            if "text" in payload:
+                text_parts.append(str(payload["text"]))
+            # 检查是否有content字段
+            elif "content" in payload:
+                if isinstance(payload["content"], str):
+                    text_parts.append(payload["content"])
+                elif isinstance(payload["content"], list):
+                    for item in payload["content"]:
+                        if isinstance(item, dict) and "text" in item:
+                            text_parts.append(str(item["text"]))
+                        elif isinstance(item, str):
+                            text_parts.append(item)
+            # 检查是否有pages字段
+            elif "pages" in payload:
+                for page_num, page_content in enumerate(payload["pages"], 1):
+                    text_parts.append(f"# 第{page_num}页")
+                    if isinstance(page_content, str):
+                        text_parts.append(page_content)
+                    elif isinstance(page_content, dict) and "text" in page_content:
+                        text_parts.append(str(page_content["text"]))
+        elif isinstance(payload, list):
+            for item in payload:
+                if isinstance(item, dict) and "text" in item:
+                    text_parts.append(str(item["text"]))
+                elif isinstance(item, str):
+                    text_parts.append(item)
+        
+        return "\n\n".join(text_parts)
+    
+    async def _use_paddleocr(self, file_path: str) -> ParseResult:
+        """
+        使用PaddleOCR解析
+        
+        Args:
+            file_path: 文件路径
+            
+        Returns:
+            ParseResult: 解析结果
+        """
+        log.info("使用PaddleOCR解析视觉文档")
+        
+        # 检查PaddleOCR是否初始化成功
+        if self.ocr is None:
+            log.error("PaddleOCR未初始化,无法解析")
+            return ParseResult(
+                content="",
+                metadata={"error": "PaddleOCR未初始化"},
+                file_type="visual"
+            )
+        
+        # 对于PDF文件,需要先转换为图片
+        if file_path.endswith('.pdf'):
+            return await self._ocr_pdf(file_path)
+        else:
+            return await self._ocr_image(file_path)
+    
+    async def _ocr_pdf(self, file_path: str) -> ParseResult:
+        """
+        OCR处理PDF文件
+        
+        Args:
+            file_path: 文件路径
+            
+        Returns:
+            ParseResult: 解析结果
+        """
+        import fitz  # PyMuPDF
+        
+        doc = fitz.open(file_path)
+        content = []
+        page_count = len(doc)
+        
+        # 遍历所有页面
+        for page_num in range(page_count):
+            page = doc[page_num]
+            # 将页面转换为图片
+            pix = page.get_pixmap(dpi=300)
+            img_path = f"temp_page_{page_num}.png"
+            pix.save(img_path)
+            
+            # OCR处理图片
+            ocr_result = self.ocr.ocr(img_path, cls=True)
+            page_text = []
+            
+            for line in ocr_result:
+                for word_info in line:
+                    page_text.append(word_info[1][0])
+            
+            content.append(f"# 第{page_num + 1}页\n{' '.join(page_text)}")
+            
+            # 删除临时图片
+            if os.path.exists(img_path):
+                os.remove(img_path)
+        
+        doc.close()
+        
+        return ParseResult(
+            content="\n\n".join(content),
+            metadata={
+                "parser": "PaddleOCR",
+                "page_count": page_count,
+                "file_size": os.path.getsize(file_path)
+            },
+            file_type="pdf_scanned"
+        )
+    
+    async def _ocr_image(self, file_path: str) -> ParseResult:
+        """
+        OCR处理图片文件
+        
+        Args:
+            file_path: 文件路径
+            
+        Returns:
+            ParseResult: 解析结果
+        """
+        # 使用PaddleOCR识别图片
+        ocr_result = self.ocr.ocr(file_path, cls=True)
+        content = []
+        
+        for line in ocr_result:
+            for word_info in line:
+                content.append(word_info[1][0])
+        
+        return ParseResult(
+            content=' '.join(content),
+            metadata={
+                "parser": "PaddleOCR",
+                "file_size": os.path.getsize(file_path)
+            },
+            file_type="image"
+        )

+ 25 - 2
parser/requirements.txt

@@ -1,5 +1,28 @@
 fastapi==0.104.1
 uvicorn==0.24.0
 pydantic==2.4.2
-#psutil==5.9.6
-python-multipart==0.0.6
+python-multipart==0.0.6
+
+# 基础库
+filetype==1.2.0
+loguru==0.7.2
+pandas>=2.2.0
+
+# 文档处理
+PyMuPDF>=1.24.0
+python-docx==0.8.11
+openpyxl==3.1.2
+python-pptx==0.6.21
+
+# OCR处理
+paddleocr>=2.8.0
+paddlepaddle>=3.0.0
+
+# 音频处理
+funasr>=0.3.1
+
+# 视频处理(FFmpeg需要系统安装)
+
+# 开发工具
+pytest==7.4.3
+black==23.11.0

+ 165 - 0
parser/utils/ffmpeg_wrapper.py

@@ -0,0 +1,165 @@
+import subprocess
+import os
+from typing import Optional
+try:
+    import cv2
+    import numpy as np
+except Exception:
+    cv2 = None
+    np = None
+
+
+class FFmpegWrapper:
+    """FFmpeg命令行包装工具"""
+    
+    def __init__(self):
+        self.ffmpeg_path = "ffmpeg"  # 假设ffmpeg已在系统PATH中
+    
+    def extract_audio(self, video_path: str, output_audio_path: str) -> bool:
+        """
+        从视频中提取音频轨道
+        
+        Args:
+            video_path: 视频文件路径
+            output_audio_path: 输出音频文件路径
+            
+        Returns:
+            bool: 操作是否成功
+        """
+        try:
+            cmd = [
+                self.ffmpeg_path,
+                "-i", video_path,
+                "-vn",  # 禁用视频
+                "-acodec", "pcm_s16le",  # 16位PCM
+                "-ar", "16000",  # 16kHz采样率
+                "-ac", "1",  # 单声道
+                "-y",  # 覆盖输出文件
+                output_audio_path
+            ]
+            
+            subprocess.run(cmd, check=True, capture_output=True, text=True)
+            return True
+        except subprocess.CalledProcessError as e:
+            raise Exception(f"音频提取失败: {e.stderr}")
+    
+    def convert_audio(self, input_audio_path: str, output_audio_path: str) -> bool:
+        """
+        转换音频格式为16k/16bit/mono wav
+        
+        Args:
+            input_audio_path: 输入音频文件路径
+            output_audio_path: 输出音频文件路径
+            
+        Returns:
+            bool: 操作是否成功
+        """
+        try:
+            cmd = [
+                self.ffmpeg_path,
+                "-i", input_audio_path,
+                "-acodec", "pcm_s16le",
+                "-ar", "16000",
+                "-ac", "1",
+                "-y",
+                output_audio_path
+            ]
+            
+            subprocess.run(cmd, check=True, capture_output=True, text=True)
+            return True
+        except subprocess.CalledProcessError as e:
+            raise Exception(f"音频转换失败: {e.stderr}")
+    
+    def extract_keyframes(self, video_path: str, output_dir: str, interval: int = 60, diff_threshold: Optional[float] = None) -> list:
+        """
+        从视频中提取关键帧
+        
+        Args:
+            video_path: 视频文件路径
+            output_dir: 输出目录
+            interval: 提取间隔(秒)
+            
+        Returns:
+            list: 提取的关键帧文件路径列表
+        """
+        try:
+            # 确保输出目录存在
+            os.makedirs(output_dir, exist_ok=True)
+            
+            # 提取关键帧(按固定频率导出帧)
+            output_pattern = os.path.join(output_dir, "frame_%06d.jpg")
+            cmd = [
+                self.ffmpeg_path,
+                "-i", video_path,
+                "-vf", f"fps=1/{interval}",  # 每 interval 秒一张
+                "-y",
+                output_pattern
+            ]
+            
+            subprocess.run(cmd, check=True, capture_output=True, text=True)
+            
+            # 收集提取的帧
+            frames = []
+            for file in os.listdir(output_dir):
+                if file.startswith("frame_") and file.endswith(".jpg"):
+                    frames.append(os.path.join(output_dir, file))
+
+            frames = sorted(frames)
+
+            # 如果未提供差异阈值,直接返回所有按固定频率提取的帧
+            if diff_threshold is None:
+                return frames
+
+            # 检查依赖
+            if cv2 is None or np is None:
+                raise Exception("OpenCV (opencv-python) 和 numpy 需要安装以启用帧差法(pip install opencv-python numpy)")
+
+            # 使用 OpenCV 的灰度图像计算帧差,比较当前帧与上一个边界帧(pre),当差异>=阈值时标记为关键帧
+            # 首先读取所有帧(彩色),以便能使用 cvtColor 按要求比较
+            imgs = []
+            for frame_path in frames:
+                try:
+                    img = cv2.imread(frame_path)  # BGR
+                    imgs.append(img)
+                except Exception:
+                    imgs.append(None)
+
+            filtered = []
+            # 为了加速计算,统一缩放尺寸 (width, height)
+            resize_to = (320, 240)
+
+            # 找到第一个有效帧作为初始关键帧
+            pre = None
+            for idx, img in enumerate(imgs):
+                if img is not None:
+                    filtered.append(frames[idx])
+                    pre = idx
+                    break
+
+            if pre is None:
+                return []
+
+            # 从下一个帧开始,比较当前帧与 imgs[pre]
+            for i in range(pre + 1, len(imgs)):
+                curr = imgs[i]
+                if curr is None:
+                    continue
+                prev = imgs[pre]
+                try:
+                    prev_gray = cv2.cvtColor(prev, cv2.COLOR_BGR2GRAY)
+                    curr_gray = cv2.cvtColor(curr, cv2.COLOR_BGR2GRAY)
+                    if resize_to is not None:
+                        prev_gray = cv2.resize(prev_gray, resize_to, interpolation=cv2.INTER_AREA)
+                        curr_gray = cv2.resize(curr_gray, resize_to, interpolation=cv2.INTER_AREA)
+
+                    diff_val = np.mean(np.abs(curr_gray.astype(int) - prev_gray.astype(int)))
+                except Exception:
+                    continue
+
+                if diff_val >= float(diff_threshold):
+                    filtered.append(frames[i])
+                    pre = i
+
+            return filtered
+        except subprocess.CalledProcessError as e:
+            raise Exception(f"关键帧提取失败: {e.stderr}")

+ 47 - 0
parser/utils/logger.py

@@ -0,0 +1,47 @@
+from loguru import logger
+import os
+
+
+class Logger:
+    """日志管理工具"""
+    
+    def __init__(self, log_file: str = "parsing.log"):
+        """
+        初始化日志配置
+        
+        Args:
+            log_file: 日志文件路径
+        """
+        # 移除默认的控制台输出
+        logger.remove()
+        
+        # 添加控制台输出
+        logger.add(
+            sink=lambda msg: print(msg, end=""),
+            level="INFO",
+            format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>"
+        )
+        
+        # 添加文件输出
+        logger.add(
+            sink=log_file,
+            level="DEBUG",
+            rotation="100 MB",
+            compression="zip",
+            format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}"
+        )
+    
+    @property
+    def log(self):
+        """
+        获取logger实例
+        
+        Returns:
+            logger: loguru logger实例
+        """
+        return logger
+
+
+# 创建全局日志实例
+logger_instance = Logger()
+log = logger_instance.log

+ 51 - 0
parser/utils/mime_detector.py

@@ -0,0 +1,51 @@
+import filetype
+import os
+
+
+class MimeDetector:
+    """文件MIME类型检测工具"""
+    
+    def __init__(self):
+        pass
+    
+    def detect(self, file_path: str) -> str:
+        """
+        检测文件的MIME类型
+        
+        Args:
+            file_path: 文件路径
+            
+        Returns:
+            str: MIME类型字符串
+        """
+        try:
+            # 使用filetype库检测文件类型
+            kind = filetype.guess(file_path)
+            if kind:
+                return kind.mime
+            else:
+                # 如果filetype无法检测,根据文件扩展名猜测
+                ext = os.path.splitext(file_path)[1].lower()
+                ext_to_mime = {
+                    '.txt': 'text/plain',
+                    '.md': 'text/markdown',
+                    '.pdf': 'application/pdf',
+                    '.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
+                    '.doc': 'application/msword',
+                    '.xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
+                    '.xls': 'application/vnd.ms-excel',
+                    '.pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation',
+                    '.ppt': 'application/vnd.ms-powerpoint',
+                    '.jpg': 'image/jpeg',
+                    '.jpeg': 'image/jpeg',
+                    '.png': 'image/png',
+                    '.gif': 'image/gif',
+                    '.wav': 'audio/wav',
+                    '.mp3': 'audio/mpeg',
+                    '.mp4': 'video/mp4',
+                    '.avi': 'video/x-msvideo',
+                    '.mov': 'video/quicktime'
+                }
+                return ext_to_mime.get(ext, 'application/octet-stream')
+        except Exception as e:
+            raise Exception(f"文件类型检测失败: {str(e)}")

+ 80 - 0
parser/utils/stability.py

@@ -0,0 +1,80 @@
+import asyncio
+import functools
+from typing import Callable, Any, List, Coroutine
+from utils.logger import log
+
+
+def timeout(seconds: int):
+    """
+    超时装饰器,防止函数执行时间过长
+    
+    Args:
+        seconds: 超时时间(秒)
+        
+    Returns:
+        Callable: 装饰后的函数
+    """
+    def decorator(func: Callable) -> Callable:
+        @functools.wraps(func)
+        async def wrapper(*args, **kwargs) -> Any:
+            try:
+                return await asyncio.wait_for(func(*args, **kwargs), timeout=seconds)
+            except asyncio.TimeoutError:
+                log.error(f"函数 {func.__name__} 执行超时,已超过 {seconds} 秒")
+                raise Exception(f"执行超时,已超过 {seconds} 秒")
+        return wrapper
+    return decorator
+
+
+class AsyncDispatcher:
+    """异步调度器,支持并发处理多个任务"""
+    
+    def __init__(self, max_concurrency: int = 5):
+        """
+        初始化异步调度器
+        
+        Args:
+            max_concurrency: 最大并发数
+        """
+        self.max_concurrency = max_concurrency
+    
+    async def run(self, tasks: List[Coroutine]) -> List[Any]:
+        """
+        并发执行多个任务
+        
+        Args:
+            tasks: 任务列表
+            
+        Returns:
+            List[Any]: 任务执行结果列表
+        """
+        log.info(f"开始并发执行 {len(tasks)} 个任务,最大并发数: {self.max_concurrency}")
+        
+        # 创建信号量控制并发
+        semaphore = asyncio.Semaphore(self.max_concurrency)
+        
+        async def bounded_task(task: Coroutine) -> Any:
+            async with semaphore:
+                try:
+                    return await task
+                except Exception as e:
+                    log.error(f"任务执行失败: {str(e)}")
+                    return None
+        
+        # 并发执行任务
+        results = await asyncio.gather(
+            *[bounded_task(task) for task in tasks],
+            return_exceptions=True
+        )
+        
+        # 处理异常结果
+        processed_results = []
+        for i, result in enumerate(results):
+            if isinstance(result, Exception):
+                log.error(f"第 {i+1} 个任务执行失败: {str(result)}")
+                processed_results.append(None)
+            else:
+                processed_results.append(result)
+        
+        log.info(f"并发任务执行完成,成功 {sum(1 for r in processed_results if r is not None)} 个")
+        return processed_results