from core.router import Parser from models.result import ParseResult from utils.logger import log import os import time import httpx import json import io import zipfile from pathlib import Path import asyncio # 延迟导入PaddleOCR,避免模块级初始化 class VisualDocParser(Parser): """视觉文档解析器,处理图片和扫描件PDF""" def __init__(self): # MinerU API配置 - 使用本地部署的服务 self.mineru_api_key = "" self.base_url = "http://10.192.72.13:7284" self.model_version = "hybrid-auto-engine" self.poll_interval_sec = 3.0 self.max_wait_sec = 300.0 log.info("VisualDocParser初始化完成,使用本地部署的MinerU服务") async def parse(self, file_path: str) -> ParseResult: """ 解析视觉文档 Args: file_path: 文件路径 Returns: ParseResult: 解析结果 """ log.info(f"开始解析视觉文档: {file_path}") try: # 只使用MinerU API,避免PaddleOCR的初始化问题 result = await self._try_mineru(file_path) if result: return result # MinerU失败时,返回错误信息 return ParseResult( content="", metadata={"error": "MinerU API解析失败"}, file_type="visual" ) except Exception as e: log.error(f"视觉文档解析失败: {str(e)}") return ParseResult( content="", metadata={"error": str(e)}, file_type="visual" ) async def _try_mineru(self, file_path: str) -> ParseResult: """ 尝试使用本地MinerU API解析 Args: file_path: 文件路径 Returns: ParseResult: 解析结果,如果失败返回None """ try: log.info("开始使用本地MinerU API解析文件") file_path_obj = Path(file_path) if not file_path_obj.exists(): raise FileNotFoundError(str(file_path)) log.info(f"Calling local MinerU for file: {file_path}") # 直接使用本地API上传文件并获取结果 result = await self._upload_and_parse(file_path_obj) # 提取文本内容 text_content = self._extract_text_from_local_result(result) return ParseResult( content=text_content, metadata={ "parser": "Local MinerU API", "file_size": file_path_obj.stat().st_size, "backend": self.model_version }, file_type="visual" ) except Exception as e: log.warning(f"本地MinerU API解析失败: {str(e)}") return None async def _upload_and_parse(self, file_path: Path) -> dict: """ 上传文件并解析 Args: file_path: 文件路径 Returns: dict: 解析结果 """ url = f"{self.base_url}/file_parse" # 准备表单数据 files = { "files": (file_path.name, open(file_path, 'rb')) } # 准备参数 data = { "backend": self.model_version, "lang_list": ["ch"], "return_md": True, "formula_enable": True, "table_enable": True } async with httpx.AsyncClient(timeout=300) as client: resp = await client.post(url, files=files, data=data) resp.raise_for_status() result = resp.json() return result def _extract_text_from_local_result(self, result: dict) -> str: """ 从本地MinerU返回的结果中提取文本内容 Args: result: 本地MinerU返回的结果 Returns: str: 提取的文本内容 """ # 处理不同可能的返回结构 text_parts = [] if isinstance(result, dict): # 检查是否有results字段(新的返回结构) if "results" in result: results = result["results"] if isinstance(results, dict): for key, value in results.items(): if isinstance(value, dict): # 检查是否有md_content字段 if "md_content" in value: text_parts.append(str(value["md_content"])) # 检查是否有text字段 elif "text" in value: text_parts.append(str(value["text"])) # 检查是否有markdown内容 elif "markdown" in result: text_parts.append(str(result["markdown"])) # 检查是否有text字段 elif "text" in result: text_parts.append(str(result["text"])) # 检查是否有content字段 elif "content" in result: if isinstance(result["content"], str): text_parts.append(result["content"]) elif isinstance(result["content"], list): for item in result["content"]: if isinstance(item, dict) and "text" in item: text_parts.append(str(item["text"])) elif isinstance(item, str): text_parts.append(item) return "\n\n".join(text_parts) def _safe_stem(self, stem: str) -> str: """ 创建安全的缓存键 Args: stem: 文件stem Returns: str: 安全的缓存键 """ import re return re.sub(r'[^a-zA-Z0-9_-]', '_', stem) def _extract_text_from_payload(self, payload: dict) -> str: """ 从MinerU返回的payload中提取文本内容 Args: payload: MinerU返回的payload Returns: str: 提取的文本内容 """ # 根据MinerU API返回的结构提取文本 text_parts = [] # 处理不同可能的返回结构 if isinstance(payload, dict): # 检查是否有text字段 if "text" in payload: text_parts.append(str(payload["text"])) # 检查是否有content字段 elif "content" in payload: if isinstance(payload["content"], str): text_parts.append(payload["content"]) elif isinstance(payload["content"], list): for item in payload["content"]: if isinstance(item, dict) and "text" in item: text_parts.append(str(item["text"])) elif isinstance(item, str): text_parts.append(item) # 检查是否有pages字段 elif "pages" in payload: for page_num, page_content in enumerate(payload["pages"], 1): text_parts.append(f"# 第{page_num}页") if isinstance(page_content, str): text_parts.append(page_content) elif isinstance(page_content, dict) and "text" in page_content: text_parts.append(str(page_content["text"])) elif isinstance(payload, list): for item in payload: if isinstance(item, dict) and "text" in item: text_parts.append(str(item["text"])) elif isinstance(item, str): text_parts.append(item) return "\n\n".join(text_parts) async def _use_paddleocr(self, file_path: str) -> ParseResult: """ 使用PaddleOCR解析 Args: file_path: 文件路径 Returns: ParseResult: 解析结果 """ log.info("使用PaddleOCR解析视觉文档") # 检查PaddleOCR是否初始化成功 if self.ocr is None: log.error("PaddleOCR未初始化,无法解析") return ParseResult( content="", metadata={"error": "PaddleOCR未初始化"}, file_type="visual" ) # 对于PDF文件,需要先转换为图片 if file_path.endswith('.pdf'): return await self._ocr_pdf(file_path) else: return await self._ocr_image(file_path) async def _ocr_pdf(self, file_path: str) -> ParseResult: """ OCR处理PDF文件 Args: file_path: 文件路径 Returns: ParseResult: 解析结果 """ import fitz # PyMuPDF doc = fitz.open(file_path) content = [] page_count = len(doc) # 遍历所有页面 for page_num in range(page_count): page = doc[page_num] # 将页面转换为图片 pix = page.get_pixmap(dpi=300) img_path = f"temp_page_{page_num}.png" pix.save(img_path) # OCR处理图片 ocr_result = self.ocr.ocr(img_path, cls=True) page_text = [] for line in ocr_result: for word_info in line: page_text.append(word_info[1][0]) content.append(f"# 第{page_num + 1}页\n{' '.join(page_text)}") # 删除临时图片 if os.path.exists(img_path): os.remove(img_path) doc.close() return ParseResult( content="\n\n".join(content), metadata={ "parser": "PaddleOCR", "page_count": page_count, "file_size": os.path.getsize(file_path) }, file_type="pdf_scanned" ) async def _ocr_image(self, file_path: str) -> ParseResult: """ OCR处理图片文件 Args: file_path: 文件路径 Returns: ParseResult: 解析结果 """ # 使用PaddleOCR识别图片 ocr_result = self.ocr.ocr(file_path, cls=True) content = [] for line in ocr_result: for word_info in line: content.append(word_info[1][0]) return ParseResult( content=' '.join(content), metadata={ "parser": "PaddleOCR", "file_size": os.path.getsize(file_path) }, file_type="image" )