| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342 |
- from typing import Optional, Type
- from abc import ABC, abstractmethod
- import fitz # PyMuPDF
- from utils.mime_detector import MimeDetector
- from utils.logger import log
- from models.result import ParseResult
- class Parser(ABC):
- """解析器基类"""
-
- @abstractmethod
- async def parse(self, file_path: str) -> ParseResult:
- """
- 解析文件
-
- Args:
- file_path: 文件路径
-
- Returns:
- ParseResult: 解析结果
- """
- pass
-
- def _get_file_size(self, file_path: str) -> int:
- """
- 获取文件大小
-
- Args:
- file_path: 文件路径
-
- Returns:
- int: 文件大小(字节)
- """
- import os
- try:
- return os.path.getsize(file_path)
- except Exception:
- return 0
- class ParserFactory:
- """解析器工厂类"""
-
- def __init__(self):
- self.mime_detector = MimeDetector()
- self.parsers = {}
- self.parser_instances = {} # 缓存解析器实例
- # 统计信息
- self.stats = {
- 'total_files': 0,
- 'total_size': 0,
- 'text_files': 0,
- 'text_size': 0,
- 'image_files': 0,
- 'image_size': 0,
- 'audio_files': 0,
- 'audio_size': 0,
- 'video_files': 0,
- 'video_size': 0,
- 'pdf_files': 0,
- 'pdf_size': 0,
- 'office_files': 0,
- 'office_size': 0,
- 'total_time': 0,
- 'successful_files': 0,
- 'failed_files': 0
- }
-
- def register_parser(self, mime_type: str, parser_class: Type[Parser]):
- """
- 注册解析器
-
- Args:
- mime_type: MIME类型
- parser_class: 解析器类
- """
- self.parsers[mime_type] = parser_class
-
- async def get_parser(self, file_path: str) -> Parser:
- """
- 根据文件类型和内容特征获取合适的解析器
-
- Args:
- file_path: 文件路径
-
- Returns:
- Parser: 解析器实例
- """
- log.info(f"开始获取解析器,文件路径: {file_path}")
- # 1. 检测文件MIME类型
- mime_type = self.mime_detector.detect(file_path)
- log.info(f"文件MIME类型: {mime_type}")
-
- # 2. 第一层路由:根据MIME类型分流
- if mime_type.startswith("text/"):
- log.info("检测到文本文件,使用TextParser")
- if "TextParser" not in self.parser_instances:
- from parsers.text_parser import TextParser
- self.parser_instances["TextParser"] = TextParser()
- return self.parser_instances["TextParser"]
- elif mime_type.startswith("image/"):
- log.info("检测到图片文件,使用VisualDocParser")
- if "VisualDocParser" not in self.parser_instances:
- from parsers.visual_parser import VisualDocParser
- self.parser_instances["VisualDocParser"] = VisualDocParser()
- return self.parser_instances["VisualDocParser"]
- elif mime_type.startswith("audio/"):
- log.info("检测到音频文件,使用AudioParser")
- if "AudioParser" not in self.parser_instances:
- from parsers.audio_parser import AudioParser
- self.parser_instances["AudioParser"] = AudioParser()
- return self.parser_instances["AudioParser"]
- elif mime_type.startswith("video/"):
- log.info("检测到视频文件,使用VideoParser")
- if "VideoParser" not in self.parser_instances:
- from parsers.video_parser import VideoParser
- self.parser_instances["VideoParser"] = VideoParser()
- return self.parser_instances["VideoParser"]
- elif mime_type == "application/pdf":
- # 3. 第二层路由:PDF特殊处理
- log.info("检测到PDF文件,进入特殊路由")
- return await self._route_pdf(file_path)
- elif "openxmlformats" in mime_type or mime_type == "application/msword":
- # Office文件处理(包括docx和doc)
- log.info(f"检测到Office文件,MIME类型: {mime_type},使用NativeDocParser")
- return await self._route_office(file_path, mime_type)
- else:
- log.error(f"不支持的文件类型: {mime_type}")
- raise Exception(f"不支持的文件类型: {mime_type}")
-
- async def _route_pdf(self, file_path: str) -> Parser:
- """
- PDF文件路由逻辑
-
- Args:
- file_path: 文件路径
-
- Returns:
- Parser: 解析器实例
- """
- # 检测PDF是否为扫描件(文本密度检测)
- if self._is_scanned_pdf(file_path):
- log.info("PDF为扫描件,使用VisualDocParser")
- if "VisualDocParser" not in self.parser_instances:
- from parsers.visual_parser import VisualDocParser
- self.parser_instances["VisualDocParser"] = VisualDocParser()
- return self.parser_instances["VisualDocParser"]
- else:
- log.info("PDF为原生文档,使用NativeDocParser")
- if "NativeDocParser" not in self.parser_instances:
- from parsers.native_parser import NativeDocParser
- self.parser_instances["NativeDocParser"] = NativeDocParser()
- return self.parser_instances["NativeDocParser"]
-
- async def _route_office(self, file_path: str, mime_type: str) -> Parser:
- """
- Office文件路由逻辑
-
- Args:
- file_path: 文件路径
- mime_type: MIME类型
-
- Returns:
- Parser: 解析器实例
- """
- if "NativeDocParser" not in self.parser_instances:
- from parsers.native_parser import NativeDocParser
- self.parser_instances["NativeDocParser"] = NativeDocParser()
- return self.parser_instances["NativeDocParser"]
-
- def _is_scanned_pdf(self, file_path: str) -> bool:
- """
- 检测PDF是否为扫描件
-
- Args:
- file_path: 文件路径
-
- Returns:
- bool: 是否为扫描件
- """
- try:
- doc = fitz.open(file_path)
- text_content = ""
- # 提取前3页文本
- for page_num in range(min(3, len(doc))):
- page = doc[page_num]
- text_content += page.get_text()
- doc.close()
-
- # 计算有效字符数
- valid_chars = len([c for c in text_content if c.isalnum() or c.isspace()])
- log.info(f"PDF前3页有效字符数: {valid_chars}")
-
- # 如果有效字符数少于50,认为是扫描件
- return valid_chars < 50
- except Exception as e:
- log.error(f"PDF文本提取失败: {str(e)}")
- # 提取失败时默认使用VisualDocParser
- return True
-
- async def parse(self, file_path: str) -> ParseResult:
- """
- 解析文件的入口方法
-
- Args:
- file_path: 文件路径
-
- Returns:
- ParseResult: 解析结果
- """
- import time
- import os
-
- start_time = time.time()
- file_size = 0
-
- try:
- file_size = os.path.getsize(file_path)
- except Exception:
- pass
-
- log.info(f"开始解析文件: {file_path}, 文件大小: {file_size / (1024 * 1024):.2f} MB")
-
- try:
- parser = await self.get_parser(file_path)
- log.info(f"获取到解析器: {parser.__class__.__name__}")
-
- result = await parser.parse(file_path)
-
- end_time = time.time()
- elapsed_time = end_time - start_time
-
- # 更新统计信息
- self.stats['total_files'] += 1
- self.stats['total_size'] += file_size
- self.stats['total_time'] += elapsed_time
- self.stats['successful_files'] += 1
-
- # 根据文件类型更新统计
- file_type = result.file_type
- if file_type.startswith('text'):
- self.stats['text_files'] += 1
- self.stats['text_size'] += file_size
- elif file_type.startswith('image') or file_type == 'visual':
- self.stats['image_files'] += 1
- self.stats['image_size'] += file_size
- elif file_type.startswith('audio'):
- self.stats['audio_files'] += 1
- self.stats['audio_size'] += file_size
- elif file_type.startswith('video'):
- self.stats['video_files'] += 1
- self.stats['video_size'] += file_size
- elif file_type == 'pdf' or file_type == 'pdf_scanned':
- self.stats['pdf_files'] += 1
- self.stats['pdf_size'] += file_size
- elif file_type == 'office':
- self.stats['office_files'] += 1
- self.stats['office_size'] += file_size
-
- # 解析结果日志
- log.info(f"文件解析完成,耗时: {elapsed_time:.2f} 秒")
- log.info(f"文件类型: {result.file_type}")
- log.info(f"解析内容长度: {len(result.content)} 字符")
- log.info(f"元数据: {result.metadata}")
- if result.tables:
- log.info(f"提取到表格数量: {len(result.tables)}")
-
- return result
- except Exception as e:
- end_time = time.time()
- elapsed_time = end_time - start_time
-
- # 更新统计信息
- self.stats['total_files'] += 1
- self.stats['total_size'] += file_size
- self.stats['total_time'] += elapsed_time
- self.stats['failed_files'] += 1
-
- log.error(f"解析失败: {str(e)}, 耗时: {elapsed_time:.2f} 秒")
- # 返回错误结果
- return ParseResult(
- content=f"解析失败: {str(e)}",
- metadata={"error": str(e)},
- file_type="error"
- )
-
- def generate_performance_report(self) -> str:
- """
- 生成性能报告
-
- Returns:
- str: 性能报告
- """
- stats = self.stats
-
- # 计算各项指标
- total_files = stats['total_files']
- total_size = stats['total_size']
- total_time = stats['total_time']
- successful_files = stats['successful_files']
- failed_files = stats['failed_files']
-
- # 计算各类文件占比
- text_ratio = (stats['text_size'] / total_size * 100) if total_size > 0 else 0
- image_ratio = (stats['image_size'] / total_size * 100) if total_size > 0 else 0
- audio_ratio = (stats['audio_size'] / total_size * 100) if total_size > 0 else 0
- video_ratio = (stats['video_size'] / total_size * 100) if total_size > 0 else 0
- pdf_ratio = (stats['pdf_size'] / total_size * 100) if total_size > 0 else 0
- office_ratio = (stats['office_size'] / total_size * 100) if total_size > 0 else 0
-
- # 计算解析速度
- total_size_mb = total_size / (1024 * 1024)
- avg_speed = (total_size_mb / total_time) if total_time > 0 else 0
-
- # 生成报告
- report = f"""# 解析性能报告
- ## 总体情况
- - 总解析文件数: {total_files}
- - 成功解析: {successful_files}
- - 解析失败: {failed_files}
- - 总文件大小: {total_size_mb:.2f} MB
- - 总耗时: {total_time:.2f} 秒
- - 平均解析速度: {avg_speed:.2f} MB/秒
- ## 文件类型分布
- - 文本文件: {stats['text_files']} 个, {stats['text_size'] / (1024 * 1024):.2f} MB, 占比: {text_ratio:.2f}%
- - 图片文件: {stats['image_files']} 个, {stats['image_size'] / (1024 * 1024):.2f} MB, 占比: {image_ratio:.2f}%
- - 音频文件: {stats['audio_files']} 个, {stats['audio_size'] / (1024 * 1024):.2f} MB, 占比: {audio_ratio:.2f}%
- - 视频文件: {stats['video_files']} 个, {stats['video_size'] / (1024 * 1024):.2f} MB, 占比: {video_ratio:.2f}%
- - PDF文件: {stats['pdf_files']} 个, {stats['pdf_size'] / (1024 * 1024):.2f} MB, 占比: {pdf_ratio:.2f}%
- - Office文件: {stats['office_files']} 个, {stats['office_size'] / (1024 * 1024):.2f} MB, 占比: {office_ratio:.2f}%
- ## 性能分析
- - 文本类平均解析速度: {(stats['text_size'] / (1024 * 1024) / total_time):.2f} MB/秒 (如果有文本文件)
- - 图片类平均解析速度: {(stats['image_size'] / (1024 * 1024) / total_time):.2f} MB/秒 (如果有图片文件)
- - 音频类平均解析速度: {(stats['audio_size'] / (1024 * 1024) / total_time):.2f} MB/秒 (如果有音频文件)
- - 视频类平均解析速度: {(stats['video_size'] / (1024 * 1024) / total_time):.2f} MB/秒 (如果有视频文件)
- """
-
- return report
|