| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377 |
- """
- MinerU vLLM 处理器
- 基于 MinerU demo.py 框架的文档处理类
- """
- import os
- import json
- import time
- import traceback
- from pathlib import Path
- from typing import List, Dict, Any
- from loguru import logger
- # 导入 MinerU 核心组件
- from mineru.cli.common import read_fn, convert_pdf_bytes_to_bytes_by_pypdfium2
- from mineru.data.data_reader_writer import FileBasedDataWriter
- from mineru.utils.draw_bbox import draw_layout_bbox
- from mineru.utils.enum_class import MakeMode
- from mineru.backend.vlm.vlm_analyze import doc_analyze as vlm_doc_analyze
- from mineru.backend.vlm.vlm_middle_json_mkcontent import union_make as vlm_union_make
- # 导入 ocr_utils
- import sys
- ocr_platform_root = Path(__file__).parents[2]
- if str(ocr_platform_root) not in sys.path:
- sys.path.insert(0, str(ocr_platform_root))
- from ocr_utils import normalize_markdown_table, normalize_json_table
- class MinerUVLLMProcessor:
- """MinerU vLLM 处理器 (基于 demo.py 框架)"""
-
- def __init__(self,
- server_url: str = "http://127.0.0.1:8121",
- timeout: int = 300,
- normalize_numbers: bool = False,
- debug: bool = False):
- """
- 初始化处理器
-
- Args:
- server_url: vLLM 服务器地址
- timeout: 请求超时时间
- normalize_numbers: 是否标准化数字
- debug: 是否启用调试模式
- """
- self.server_url = server_url.rstrip('/')
- self.timeout = timeout
- self.normalize_numbers = normalize_numbers
- self.debug = debug
- self.backend = "http-client" # 固定使用 http-client 后端
-
- logger.info(f"MinerU vLLM Processor 初始化完成:")
- logger.info(f" - 服务器: {server_url}")
- logger.info(f" - 后端: vlm-{self.backend}")
- logger.info(f" - 超时: {timeout}s")
- logger.info(f" - 数字标准化: {normalize_numbers}")
- logger.info(f" - 调试模式: {debug}")
-
- def do_parse_single_file(self,
- input_file: str,
- output_dir: str,
- start_page_id: int = 0,
- end_page_id: int | None = None) -> Dict[str, Any]:
- """
- 解析单个文件 (参考 demo.py 的 do_parse 函数)
-
- Args:
- input_file: 文件路径
- output_dir: 输出目录
- start_page_id: 起始页ID
- end_page_id: 结束页ID
-
- Returns:
- dict: 处理结果
- """
- try:
- # 准备文件名和字节数据
- file_path = Path(input_file)
- pdf_file_name = file_path.stem
- pdf_bytes = read_fn(str(file_path))
-
- # 转换PDF字节流 (如果需要)
- if file_path.suffix.lower() == '.pdf':
- pdf_bytes = convert_pdf_bytes_to_bytes_by_pypdfium2(
- pdf_bytes, start_page_id, end_page_id
- )
-
- # 准备环境 (创建输出目录)
- local_md_dir = Path(output_dir).resolve()
- local_image_dir = local_md_dir / "images"
- image_writer = FileBasedDataWriter(local_image_dir.as_posix())
- md_writer = FileBasedDataWriter(local_md_dir.as_posix())
-
- # 使用 VLM 分析文档 (核心调用)
- middle_json, model_output = vlm_doc_analyze(
- pdf_bytes,
- image_writer=image_writer,
- backend=self.backend,
- server_url=self.server_url
- )
-
- pdf_info = middle_json["pdf_info"]
-
- # 处理输出
- output_files = self._process_output(
- pdf_info=pdf_info,
- pdf_bytes=pdf_bytes,
- pdf_file_name=pdf_file_name,
- local_md_dir=local_md_dir,
- local_image_dir=local_image_dir,
- md_writer=md_writer,
- middle_json=middle_json,
- model_output=model_output,
- original_file_path=str(file_path)
- )
-
- # 统计提取信息
- extraction_stats = self._get_extraction_stats(middle_json)
-
- return {
- "success": True,
- "pdf_info": pdf_info,
- "middle_json": middle_json,
- "model_output": model_output,
- "output_files": output_files,
- "extraction_stats": extraction_stats
- }
-
- except Exception as e:
- logger.error(f"Failed to process {file_path}: {e}")
- if self.debug:
- traceback.print_exc()
- return {
- "success": False,
- "error": str(e)
- }
-
- def _process_output(self,
- pdf_info,
- pdf_bytes,
- pdf_file_name,
- local_md_dir,
- local_image_dir,
- md_writer,
- middle_json,
- model_output,
- original_file_path: str) -> Dict[str, str]:
- """
- 处理输出文件
-
- Args:
- pdf_info: PDF信息
- pdf_bytes: PDF字节数据
- pdf_file_name: PDF文件名
- local_md_dir: Markdown目录
- local_image_dir: 图片目录
- md_writer: Markdown写入器
- middle_json: 中间JSON数据
- model_output: 模型输出
- original_file_path: 原始文件路径
-
- Returns:
- dict: 保存的文件路径信息
- """
- saved_files = {}
-
- try:
- # 设置相对图片目录名
- image_dir = str(os.path.basename(local_image_dir))
-
- # 1. 生成并保存 Markdown 文件
- md_content_str = vlm_union_make(pdf_info, MakeMode.MM_MD, image_dir)
-
- # 数字标准化处理
- if self.normalize_numbers:
- original_md = md_content_str
- md_content_str = normalize_markdown_table(md_content_str)
-
- changes_count = len([1 for o, n in zip(original_md, md_content_str) if o != n])
- if changes_count > 0:
- saved_files['md_normalized'] = f"✅ 已标准化 {changes_count} 个字符(全角→半角)"
- else:
- saved_files['md_normalized'] = "ℹ️ 无需标准化(已是标准格式)"
-
- md_writer.write_string(f"{pdf_file_name}.md", md_content_str)
- saved_files['md'] = os.path.join(local_md_dir, f"{pdf_file_name}.md")
-
- # 2. 生成并保存 content_list JSON 文件
- content_list = vlm_union_make(pdf_info, MakeMode.CONTENT_LIST, image_dir)
- content_list_str = json.dumps(content_list, ensure_ascii=False, indent=2)
- md_writer.write_string(f"{pdf_file_name}_original.json", content_list_str)
-
- # 转换bbox坐标(从1000-based到像素坐标)
- if pdf_info and len(pdf_info) > 0:
- page_width, page_height = pdf_info[0].get('page_size', [1000, 1000])
- for element in content_list:
- if "bbox" in element:
- x0, y0, x1, y1 = element["bbox"]
- element["bbox"] = [
- int(x0 / 1000 * page_width),
- int(y0 / 1000 * page_height),
- int(x1 / 1000 * page_width),
- int(y1 / 1000 * page_height),
- ]
- content_list_str = json.dumps(content_list, ensure_ascii=False, indent=2)
- # 数字标准化处理
- if self.normalize_numbers:
- original_json = content_list_str
- content_list_str = normalize_json_table(content_list_str)
-
- changes_count = len([1 for o, n in zip(original_json, content_list_str) if o != n])
- if changes_count > 0:
- saved_files['json_normalized'] = f"✅ 已标准化 {changes_count} 个字符(全角→半角)"
- else:
- saved_files['json_normalized'] = "ℹ️ 无需标准化(已是标准格式)"
-
- md_writer.write_string(f"{pdf_file_name}.json", content_list_str)
- saved_files['json'] = os.path.join(local_md_dir, f"{pdf_file_name}.json")
-
- # 绘制布局边界框
- try:
- draw_layout_bbox(pdf_info, pdf_bytes, local_md_dir, f"{pdf_file_name}_layout.pdf")
- saved_files['layout_pdf'] = os.path.join(local_md_dir, f"{pdf_file_name}_layout.pdf")
- except Exception as e:
- logger.warning(f"Failed to draw layout bbox: {e}")
-
- # 调试模式下保存额外信息
- if self.debug:
- # 保存 middle.json
- middle_json_str = json.dumps(middle_json, ensure_ascii=False, indent=2)
- if self.normalize_numbers:
- middle_json_str = normalize_json_table(middle_json_str)
-
- md_writer.write_string(f"{pdf_file_name}_middle.json", middle_json_str)
- saved_files['middle_json'] = os.path.join(local_md_dir, f"{pdf_file_name}_middle.json")
-
- # 保存 model output
- if model_output:
- model_output_str = json.dumps(model_output, ensure_ascii=False, indent=2)
- md_writer.write_string(f"{pdf_file_name}_model.json", model_output_str)
- saved_files['model_output'] = os.path.join(local_md_dir, f"{pdf_file_name}_model.json")
-
- logger.info(f"Output saved to: {local_md_dir}")
-
- except Exception as e:
- logger.error(f"Error in _process_output: {e}")
- if self.debug:
- traceback.print_exc()
-
- return saved_files
-
- def _get_extraction_stats(self, middle_json: Dict) -> Dict[str, Any]:
- """
- 获取提取统计信息
-
- Args:
- middle_json: 中间JSON数据
-
- Returns:
- dict: 统计信息
- """
- stats = {
- "total_blocks": 0,
- "block_types": {},
- "total_pages": 0
- }
-
- try:
- pdf_info = middle_json.get("pdf_info", [])
- if isinstance(pdf_info, list):
- stats["total_pages"] = len(pdf_info)
-
- for page_info in pdf_info:
- para_blocks = page_info.get("para_blocks", [])
- stats["total_blocks"] += len(para_blocks)
-
- for block in para_blocks:
- block_type = block.get("type", "unknown")
- stats["block_types"][block_type] = stats["block_types"].get(block_type, 0) + 1
-
- except Exception as e:
- logger.warning(f"Failed to get extraction stats: {e}")
-
- return stats
-
- def process_single_image(self, image_path: str, output_dir: str) -> Dict[str, Any]:
- """
- 处理单张图片
-
- Args:
- image_path: 图片路径
- output_dir: 输出目录
-
- Returns:
- dict: 处理结果,包含 success 字段(基于输出文件存在性判断)
- """
- start_time = time.time()
- image_path_obj = Path(image_path)
- image_name = image_path_obj.stem
-
- # 判断是否为PDF页面(根据文件名模式)
- is_pdf_page = "_page_" in image_path_obj.name
-
- # 根据输入类型生成预期的输出文件名
- if is_pdf_page:
- # PDF页面:文件名格式为 filename_page_001.png
- # 输出文件名:filename_page_001.md 和 filename_page_001.json
- expected_md_path = Path(output_dir) / f"{image_name}.md"
- expected_json_path = Path(output_dir) / f"{image_name}.json"
- else:
- # 普通图片:输出文件名:filename.md 和 filename.json
- expected_md_path = Path(output_dir) / f"{image_name}.md"
- expected_json_path = Path(output_dir) / f"{image_name}.json"
-
- result_info = {
- "image_path": image_path,
- "processing_time": 0,
- "success": False,
- "server": self.server_url,
- "error": None,
- "output_files": {},
- "is_pdf_page": is_pdf_page,
- "extraction_stats": {}
- }
-
- try:
- # 检查输出文件是否已存在(成功判断标准)
- if expected_md_path.exists() and expected_json_path.exists():
- result_info.update({
- "success": True,
- "processing_time": 0,
- "output_files": {
- "md": str(expected_md_path),
- "json": str(expected_json_path)
- },
- "skipped": True
- })
- logger.info(f"✅ 文件已存在,跳过处理: {image_name}")
- return result_info
-
- # 使用 do_parse_single_file 处理
- parse_result = self.do_parse_single_file(image_path, output_dir)
-
- # 处理完成后,再次检查输出文件是否存在(成功判断标准)
- if expected_md_path.exists() and expected_json_path.exists():
- result_info.update({
- "success": True,
- "output_files": parse_result.get("output_files", {}),
- "extraction_stats": parse_result.get("extraction_stats", {})
- })
- logger.info(f"✅ 处理成功: {image_name}")
- else:
- # 文件不存在,标记为失败
- missing_files = []
- if not expected_md_path.exists():
- missing_files.append("md")
- if not expected_json_path.exists():
- missing_files.append("json")
- result_info["error"] = f"输出文件不存在: {', '.join(missing_files)}"
- result_info["success"] = False
- logger.error(f"❌ 处理失败: {image_name} - {result_info['error']}")
-
- except Exception as e:
- result_info["error"] = str(e)
- result_info["success"] = False
- logger.error(f"Error processing {image_name}: {e}")
- if self.debug:
- traceback.print_exc()
-
- finally:
- result_info["processing_time"] = time.time() - start_time
-
- return result_info
|