from core.router import Parser from models.result import ParseResult from utils.logger import log import fitz # PyMuPDF from docx import Document import openpyxl from pptx import Presentation import os class NativeDocParser(Parser): """原生文档解析器,处理Office文档和原生PDF""" async def parse(self, file_path: str) -> ParseResult: """ 解析原生文档 Args: file_path: 文件路径 Returns: ParseResult: 解析结果 """ log.info(f"开始解析原生文档: {file_path}") try: # 根据文件扩展名判断文件类型 ext = os.path.splitext(file_path)[1].lower() if ext == '.pdf': return await self._parse_pdf(file_path) elif ext == '.docx': return await self._parse_docx(file_path) elif ext == '.doc': return await self._parse_doc(file_path) elif ext == '.xlsx': return await self._parse_xlsx(file_path) elif ext == '.pptx': return await self._parse_pptx(file_path) else: raise Exception(f"不支持的文件扩展名: {ext}") except Exception as e: log.error(f"原生文档解析失败: {str(e)}") return ParseResult( content="", metadata={"error": str(e)}, file_type="unknown" ) async def _parse_pdf(self, file_path: str) -> ParseResult: """ 解析PDF文件 Args: file_path: 文件路径 Returns: ParseResult: 解析结果 """ doc = fitz.open(file_path) content = [] tables = [] page_count = len(doc) # 遍历所有页面 for page_num in range(page_count): page = doc[page_num] # 提取文本 text = page.get_text() content.append(f"# 第{page_num + 1}页\n{text}") # 提取表格(PyMuPDF的表格提取功能有限) # 这里可以根据需要使用更高级的表格提取库 doc.close() return ParseResult( content="\n\n".join(content), metadata={ "page_count": page_count, "file_size": os.path.getsize(file_path) }, file_type="pdf", tables=tables ) async def _parse_docx(self, file_path: str) -> ParseResult: """ 解析Word文档 Args: file_path: 文件路径 Returns: ParseResult: 解析结果 """ doc = Document(file_path) content = [] tables = [] # 提取标题和正文 for para in doc.paragraphs: if para.style.name.startswith('Heading'): # 根据标题级别添加Markdown标题 level = int(para.style.name.split(' ')[1]) content.append(f"{'#' * level} {para.text}") else: content.append(para.text) # 提取表格 for table_idx, table in enumerate(doc.tables): table_content = [] table_data = [] # 提取表头 header_cells = table.rows[0].cells header = [cell.text.strip() for cell in header_cells] table_content.append('| ' + ' | '.join(header) + ' |') table_content.append('| ' + ' | '.join(['---'] * len(header)) + ' |') table_data.append(header) # 提取表格内容 for row in table.rows[1:]: cells = row.cells row_data = [cell.text.strip() for cell in cells] table_content.append('| ' + ' | '.join(row_data) + ' |') table_data.append(row_data) content.append('\n'.join(table_content)) tables.append({ "table_id": table_idx, "data": table_data }) return ParseResult( content="\n".join(content), metadata={ "paragraph_count": len(doc.paragraphs), "table_count": len(doc.tables), "file_size": os.path.getsize(file_path) }, file_type="docx", tables=tables ) async def _parse_xlsx(self, file_path: str) -> ParseResult: """ 解析Excel文档 Args: file_path: 文件路径 Returns: ParseResult: 解析结果 """ wb = openpyxl.load_workbook(file_path) content = [] tables = [] # 遍历所有工作表 for sheet_idx, sheet_name in enumerate(wb.sheetnames): sheet = wb[sheet_name] content.append(f"# 工作表: {sheet_name}") # 提取表格数据 table_data = [] max_row = sheet.max_row max_col = sheet.max_column # 提取表头 header = [] for col in range(1, max_col + 1): cell_value = sheet.cell(row=1, column=col).value header.append(str(cell_value) if cell_value else '') table_data.append(header) # 提取表格内容 for row in range(2, max_row + 1): row_data = [] for col in range(1, max_col + 1): cell_value = sheet.cell(row=row, column=col).value row_data.append(str(cell_value) if cell_value else '') table_data.append(row_data) # 转换为Markdown表格 if header: markdown_table = [] markdown_table.append('| ' + ' | '.join(header) + ' |') markdown_table.append('| ' + ' | '.join(['---'] * len(header)) + ' |') for row_data in table_data[1:]: markdown_table.append('| ' + ' | '.join(row_data) + ' |') content.append('\n'.join(markdown_table)) tables.append({ "sheet_name": sheet_name, "data": table_data }) wb.close() return ParseResult( content="\n\n".join(content), metadata={ "sheet_count": len(wb.sheetnames), "file_size": os.path.getsize(file_path) }, file_type="xlsx", tables=tables ) async def _parse_pptx(self, file_path: str) -> ParseResult: """ 解析PPT文档 Args: file_path: 文件路径 Returns: ParseResult: 解析结果 """ prs = Presentation(file_path) content = [] # 遍历所有幻灯片 for slide_idx, slide in enumerate(prs.slides): content.append(f"# 幻灯片 {slide_idx + 1}") # 提取标题 for shape in slide.shapes: if hasattr(shape, 'text_frame') and shape.text_frame.text: if shape == slide.shapes[0]: # 假设第一个形状是标题 content.append(f"## {shape.text_frame.text}") else: content.append(shape.text_frame.text) # 提取备注 if slide.notes_slide: notes = slide.notes_slide.notes_text_frame.text if notes: content.append(f"### 备注\n{notes}") return ParseResult( content="\n\n".join(content), metadata={ "slide_count": len(prs.slides), "file_size": os.path.getsize(file_path) }, file_type="pptx" ) async def _parse_doc(self, file_path: str) -> ParseResult: """ 解析.doc文件 Args: file_path: 文件路径 Returns: ParseResult: 解析结果 """ # 使用antiword提取.doc文件内容 import subprocess try: result = subprocess.run( ['antiword', file_path], capture_output=True, text=True, check=True ) text = result.stdout except Exception as e: log.error(f"antiword解析失败: {str(e)}") raise Exception(f"antiword解析失败: {str(e)}") content = [text] return ParseResult( content="\n".join(content), metadata={ "file_size": os.path.getsize(file_path) }, file_type="doc" )