| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336 |
- from core.router import Parser
- from models.result import ParseResult
- from utils.logger import log
- import os
- import time
- import httpx
- import json
- import io
- import zipfile
- from pathlib import Path
- import asyncio
- # 延迟导入PaddleOCR,避免模块级初始化
- class VisualDocParser(Parser):
- """视觉文档解析器,处理图片和扫描件PDF"""
-
- def __init__(self):
- # MinerU API配置 - 使用本地部署的服务
- self.mineru_api_key = ""
- self.base_url = "http://10.192.72.13:7284"
- self.model_version = "hybrid-auto-engine"
- self.poll_interval_sec = 3.0
- self.max_wait_sec = 300.0
- log.info("VisualDocParser初始化完成,使用本地部署的MinerU服务")
-
- async def parse(self, file_path: str) -> ParseResult:
- """
- 解析视觉文档
-
- Args:
- file_path: 文件路径
-
- Returns:
- ParseResult: 解析结果
- """
- log.info(f"开始解析视觉文档: {file_path}")
- try:
- # 只使用MinerU API,避免PaddleOCR的初始化问题
- result = await self._try_mineru(file_path)
- if result:
- return result
-
- # MinerU失败时,返回错误信息
- return ParseResult(
- content="",
- metadata={"error": "MinerU API解析失败"},
- file_type="visual"
- )
- except Exception as e:
- log.error(f"视觉文档解析失败: {str(e)}")
- return ParseResult(
- content="",
- metadata={"error": str(e)},
- file_type="visual"
- )
-
- async def _try_mineru(self, file_path: str) -> ParseResult:
- """
- 尝试使用本地MinerU API解析
-
- Args:
- file_path: 文件路径
-
- Returns:
- ParseResult: 解析结果,如果失败返回None
- """
- try:
- log.info("开始使用本地MinerU API解析文件")
- file_path_obj = Path(file_path)
- if not file_path_obj.exists():
- raise FileNotFoundError(str(file_path))
- log.info(f"Calling local MinerU for file: {file_path}")
- # 直接使用本地API上传文件并获取结果
- result = await self._upload_and_parse(file_path_obj)
-
- # 提取文本内容
- text_content = self._extract_text_from_local_result(result)
-
- return ParseResult(
- content=text_content,
- metadata={
- "parser": "Local MinerU API",
- "file_size": file_path_obj.stat().st_size,
- "backend": self.model_version
- },
- file_type="visual"
- )
-
- except Exception as e:
- log.warning(f"本地MinerU API解析失败: {str(e)}")
- return None
-
- async def _upload_and_parse(self, file_path: Path) -> dict:
- """
- 上传文件并解析
-
- Args:
- file_path: 文件路径
-
- Returns:
- dict: 解析结果
- """
- url = f"{self.base_url}/file_parse"
-
- # 准备表单数据
- files = {
- "files": (file_path.name, open(file_path, 'rb'))
- }
-
- # 准备参数
- data = {
- "backend": self.model_version,
- "lang_list": ["ch"],
- "return_md": True,
- "formula_enable": True,
- "table_enable": True
- }
- async with httpx.AsyncClient(timeout=300) as client:
- resp = await client.post(url, files=files, data=data)
- resp.raise_for_status()
- result = resp.json()
- return result
- def _extract_text_from_local_result(self, result: dict) -> str:
- """
- 从本地MinerU返回的结果中提取文本内容
-
- Args:
- result: 本地MinerU返回的结果
-
- Returns:
- str: 提取的文本内容
- """
- # 处理不同可能的返回结构
- text_parts = []
-
- if isinstance(result, dict):
- # 检查是否有results字段(新的返回结构)
- if "results" in result:
- results = result["results"]
- if isinstance(results, dict):
- for key, value in results.items():
- if isinstance(value, dict):
- # 检查是否有md_content字段
- if "md_content" in value:
- text_parts.append(str(value["md_content"]))
- # 检查是否有text字段
- elif "text" in value:
- text_parts.append(str(value["text"]))
- # 检查是否有markdown内容
- elif "markdown" in result:
- text_parts.append(str(result["markdown"]))
- # 检查是否有text字段
- elif "text" in result:
- text_parts.append(str(result["text"]))
- # 检查是否有content字段
- elif "content" in result:
- if isinstance(result["content"], str):
- text_parts.append(result["content"])
- elif isinstance(result["content"], list):
- for item in result["content"]:
- if isinstance(item, dict) and "text" in item:
- text_parts.append(str(item["text"]))
- elif isinstance(item, str):
- text_parts.append(item)
-
- return "\n\n".join(text_parts)
-
- def _safe_stem(self, stem: str) -> str:
- """
- 创建安全的缓存键
-
- Args:
- stem: 文件stem
-
- Returns:
- str: 安全的缓存键
- """
- import re
- return re.sub(r'[^a-zA-Z0-9_-]', '_', stem)
-
- def _extract_text_from_payload(self, payload: dict) -> str:
- """
- 从MinerU返回的payload中提取文本内容
-
- Args:
- payload: MinerU返回的payload
-
- Returns:
- str: 提取的文本内容
- """
- # 根据MinerU API返回的结构提取文本
- text_parts = []
-
- # 处理不同可能的返回结构
- if isinstance(payload, dict):
- # 检查是否有text字段
- if "text" in payload:
- text_parts.append(str(payload["text"]))
- # 检查是否有content字段
- elif "content" in payload:
- if isinstance(payload["content"], str):
- text_parts.append(payload["content"])
- elif isinstance(payload["content"], list):
- for item in payload["content"]:
- if isinstance(item, dict) and "text" in item:
- text_parts.append(str(item["text"]))
- elif isinstance(item, str):
- text_parts.append(item)
- # 检查是否有pages字段
- elif "pages" in payload:
- for page_num, page_content in enumerate(payload["pages"], 1):
- text_parts.append(f"# 第{page_num}页")
- if isinstance(page_content, str):
- text_parts.append(page_content)
- elif isinstance(page_content, dict) and "text" in page_content:
- text_parts.append(str(page_content["text"]))
- elif isinstance(payload, list):
- for item in payload:
- if isinstance(item, dict) and "text" in item:
- text_parts.append(str(item["text"]))
- elif isinstance(item, str):
- text_parts.append(item)
-
- return "\n\n".join(text_parts)
-
- async def _use_paddleocr(self, file_path: str) -> ParseResult:
- """
- 使用PaddleOCR解析
-
- Args:
- file_path: 文件路径
-
- Returns:
- ParseResult: 解析结果
- """
- log.info("使用PaddleOCR解析视觉文档")
-
- # 检查PaddleOCR是否初始化成功
- if self.ocr is None:
- log.error("PaddleOCR未初始化,无法解析")
- return ParseResult(
- content="",
- metadata={"error": "PaddleOCR未初始化"},
- file_type="visual"
- )
-
- # 对于PDF文件,需要先转换为图片
- if file_path.endswith('.pdf'):
- return await self._ocr_pdf(file_path)
- else:
- return await self._ocr_image(file_path)
-
- async def _ocr_pdf(self, file_path: str) -> ParseResult:
- """
- OCR处理PDF文件
-
- Args:
- file_path: 文件路径
-
- Returns:
- ParseResult: 解析结果
- """
- import fitz # PyMuPDF
-
- doc = fitz.open(file_path)
- content = []
- page_count = len(doc)
-
- # 遍历所有页面
- for page_num in range(page_count):
- page = doc[page_num]
- # 将页面转换为图片
- pix = page.get_pixmap(dpi=300)
- img_path = f"temp_page_{page_num}.png"
- pix.save(img_path)
-
- # OCR处理图片
- ocr_result = self.ocr.ocr(img_path, cls=True)
- page_text = []
-
- for line in ocr_result:
- for word_info in line:
- page_text.append(word_info[1][0])
-
- content.append(f"# 第{page_num + 1}页\n{' '.join(page_text)}")
-
- # 删除临时图片
- if os.path.exists(img_path):
- os.remove(img_path)
-
- doc.close()
-
- return ParseResult(
- content="\n\n".join(content),
- metadata={
- "parser": "PaddleOCR",
- "page_count": page_count,
- "file_size": os.path.getsize(file_path)
- },
- file_type="pdf_scanned"
- )
-
- async def _ocr_image(self, file_path: str) -> ParseResult:
- """
- OCR处理图片文件
-
- Args:
- file_path: 文件路径
-
- Returns:
- ParseResult: 解析结果
- """
- # 使用PaddleOCR识别图片
- ocr_result = self.ocr.ocr(file_path, cls=True)
- content = []
-
- for line in ocr_result:
- for word_info in line:
- content.append(word_info[1][0])
-
- return ParseResult(
- content=' '.join(content),
- metadata={
- "parser": "PaddleOCR",
- "file_size": os.path.getsize(file_path)
- },
- file_type="image"
- )
|