| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282 |
- from core.router import Parser
- from models.result import ParseResult
- from utils.logger import log
- import fitz # PyMuPDF
- from docx import Document
- import openpyxl
- from pptx import Presentation
- import os
- class NativeDocParser(Parser):
- """原生文档解析器,处理Office文档和原生PDF"""
-
- async def parse(self, file_path: str) -> ParseResult:
- """
- 解析原生文档
-
- Args:
- file_path: 文件路径
-
- Returns:
- ParseResult: 解析结果
- """
- log.info(f"开始解析原生文档: {file_path}")
- try:
- # 根据文件扩展名判断文件类型
- ext = os.path.splitext(file_path)[1].lower()
-
- if ext == '.pdf':
- return await self._parse_pdf(file_path)
- elif ext == '.docx':
- return await self._parse_docx(file_path)
- elif ext == '.doc':
- return await self._parse_doc(file_path)
- elif ext == '.xlsx':
- return await self._parse_xlsx(file_path)
- elif ext == '.pptx':
- return await self._parse_pptx(file_path)
- else:
- raise Exception(f"不支持的文件扩展名: {ext}")
- except Exception as e:
- log.error(f"原生文档解析失败: {str(e)}")
- return ParseResult(
- content="",
- metadata={"error": str(e)},
- file_type="unknown"
- )
-
- async def _parse_pdf(self, file_path: str) -> ParseResult:
- """
- 解析PDF文件
-
- Args:
- file_path: 文件路径
-
- Returns:
- ParseResult: 解析结果
- """
- doc = fitz.open(file_path)
- content = []
- tables = []
- page_count = len(doc)
-
- # 遍历所有页面
- for page_num in range(page_count):
- page = doc[page_num]
- # 提取文本
- text = page.get_text()
- content.append(f"# 第{page_num + 1}页\n{text}")
-
- # 提取表格(PyMuPDF的表格提取功能有限)
- # 这里可以根据需要使用更高级的表格提取库
-
- doc.close()
-
- return ParseResult(
- content="\n\n".join(content),
- metadata={
- "page_count": page_count,
- "file_size": os.path.getsize(file_path)
- },
- file_type="pdf",
- tables=tables
- )
-
- async def _parse_docx(self, file_path: str) -> ParseResult:
- """
- 解析Word文档
-
- Args:
- file_path: 文件路径
-
- Returns:
- ParseResult: 解析结果
- """
- doc = Document(file_path)
- content = []
- tables = []
-
- # 提取标题和正文
- for para in doc.paragraphs:
- if para.style.name.startswith('Heading'):
- # 根据标题级别添加Markdown标题
- level = int(para.style.name.split(' ')[1])
- content.append(f"{'#' * level} {para.text}")
- else:
- content.append(para.text)
-
- # 提取表格
- for table_idx, table in enumerate(doc.tables):
- table_content = []
- table_data = []
-
- # 提取表头
- header_cells = table.rows[0].cells
- header = [cell.text.strip() for cell in header_cells]
- table_content.append('| ' + ' | '.join(header) + ' |')
- table_content.append('| ' + ' | '.join(['---'] * len(header)) + ' |')
- table_data.append(header)
-
- # 提取表格内容
- for row in table.rows[1:]:
- cells = row.cells
- row_data = [cell.text.strip() for cell in cells]
- table_content.append('| ' + ' | '.join(row_data) + ' |')
- table_data.append(row_data)
-
- content.append('\n'.join(table_content))
- tables.append({
- "table_id": table_idx,
- "data": table_data
- })
-
- return ParseResult(
- content="\n".join(content),
- metadata={
- "paragraph_count": len(doc.paragraphs),
- "table_count": len(doc.tables),
- "file_size": os.path.getsize(file_path)
- },
- file_type="docx",
- tables=tables
- )
-
- async def _parse_xlsx(self, file_path: str) -> ParseResult:
- """
- 解析Excel文档
-
- Args:
- file_path: 文件路径
-
- Returns:
- ParseResult: 解析结果
- """
- wb = openpyxl.load_workbook(file_path)
- content = []
- tables = []
-
- # 遍历所有工作表
- for sheet_idx, sheet_name in enumerate(wb.sheetnames):
- sheet = wb[sheet_name]
- content.append(f"# 工作表: {sheet_name}")
-
- # 提取表格数据
- table_data = []
- max_row = sheet.max_row
- max_col = sheet.max_column
-
- # 提取表头
- header = []
- for col in range(1, max_col + 1):
- cell_value = sheet.cell(row=1, column=col).value
- header.append(str(cell_value) if cell_value else '')
- table_data.append(header)
-
- # 提取表格内容
- for row in range(2, max_row + 1):
- row_data = []
- for col in range(1, max_col + 1):
- cell_value = sheet.cell(row=row, column=col).value
- row_data.append(str(cell_value) if cell_value else '')
- table_data.append(row_data)
-
- # 转换为Markdown表格
- if header:
- markdown_table = []
- markdown_table.append('| ' + ' | '.join(header) + ' |')
- markdown_table.append('| ' + ' | '.join(['---'] * len(header)) + ' |')
- for row_data in table_data[1:]:
- markdown_table.append('| ' + ' | '.join(row_data) + ' |')
- content.append('\n'.join(markdown_table))
-
- tables.append({
- "sheet_name": sheet_name,
- "data": table_data
- })
-
- wb.close()
-
- return ParseResult(
- content="\n\n".join(content),
- metadata={
- "sheet_count": len(wb.sheetnames),
- "file_size": os.path.getsize(file_path)
- },
- file_type="xlsx",
- tables=tables
- )
-
- async def _parse_pptx(self, file_path: str) -> ParseResult:
- """
- 解析PPT文档
-
- Args:
- file_path: 文件路径
-
- Returns:
- ParseResult: 解析结果
- """
- prs = Presentation(file_path)
- content = []
-
- # 遍历所有幻灯片
- for slide_idx, slide in enumerate(prs.slides):
- content.append(f"# 幻灯片 {slide_idx + 1}")
-
- # 提取标题
- for shape in slide.shapes:
- if hasattr(shape, 'text_frame') and shape.text_frame.text:
- if shape == slide.shapes[0]: # 假设第一个形状是标题
- content.append(f"## {shape.text_frame.text}")
- else:
- content.append(shape.text_frame.text)
-
- # 提取备注
- if slide.notes_slide:
- notes = slide.notes_slide.notes_text_frame.text
- if notes:
- content.append(f"### 备注\n{notes}")
-
- return ParseResult(
- content="\n\n".join(content),
- metadata={
- "slide_count": len(prs.slides),
- "file_size": os.path.getsize(file_path)
- },
- file_type="pptx"
- )
-
- async def _parse_doc(self, file_path: str) -> ParseResult:
- """
- 解析.doc文件
-
- Args:
- file_path: 文件路径
-
- Returns:
- ParseResult: 解析结果
- """
- # 使用antiword提取.doc文件内容
- import subprocess
- try:
- result = subprocess.run(
- ['antiword', file_path],
- capture_output=True,
- text=True,
- check=True
- )
- text = result.stdout
- except Exception as e:
- log.error(f"antiword解析失败: {str(e)}")
- raise Exception(f"antiword解析失败: {str(e)}")
-
- content = [text]
-
- return ParseResult(
- content="\n".join(content),
- metadata={
- "file_size": os.path.getsize(file_path)
- },
- file_type="doc"
- )
|