""" MinerU vLLM 处理器 基于 MinerU demo.py 框架的文档处理类 """ import os import json import time import traceback from pathlib import Path from typing import List, Dict, Any from loguru import logger # 导入 MinerU 核心组件 from mineru.cli.common import read_fn, convert_pdf_bytes_to_bytes_by_pypdfium2 from mineru.data.data_reader_writer import FileBasedDataWriter from mineru.utils.draw_bbox import draw_layout_bbox from mineru.utils.enum_class import MakeMode from mineru.backend.vlm.vlm_analyze import doc_analyze as vlm_doc_analyze from mineru.backend.vlm.vlm_middle_json_mkcontent import union_make as vlm_union_make # 导入 ocr_utils import sys ocr_platform_root = Path(__file__).parents[2] if str(ocr_platform_root) not in sys.path: sys.path.insert(0, str(ocr_platform_root)) from ocr_utils import normalize_markdown_table, normalize_json_table class MinerUVLLMProcessor: """MinerU vLLM 处理器 (基于 demo.py 框架)""" def __init__(self, server_url: str = "http://127.0.0.1:8121", timeout: int = 300, normalize_numbers: bool = False, debug: bool = False): """ 初始化处理器 Args: server_url: vLLM 服务器地址 timeout: 请求超时时间 normalize_numbers: 是否标准化数字 debug: 是否启用调试模式 """ self.server_url = server_url.rstrip('/') self.timeout = timeout self.normalize_numbers = normalize_numbers self.debug = debug self.backend = "http-client" # 固定使用 http-client 后端 logger.info(f"MinerU vLLM Processor 初始化完成:") logger.info(f" - 服务器: {server_url}") logger.info(f" - 后端: vlm-{self.backend}") logger.info(f" - 超时: {timeout}s") logger.info(f" - 数字标准化: {normalize_numbers}") logger.info(f" - 调试模式: {debug}") def do_parse_single_file(self, input_file: str, output_dir: str, start_page_id: int = 0, end_page_id: int | None = None) -> Dict[str, Any]: """ 解析单个文件 (参考 demo.py 的 do_parse 函数) Args: input_file: 文件路径 output_dir: 输出目录 start_page_id: 起始页ID end_page_id: 结束页ID Returns: dict: 处理结果 """ try: # 准备文件名和字节数据 file_path = Path(input_file) pdf_file_name = file_path.stem pdf_bytes = read_fn(str(file_path)) # 转换PDF字节流 (如果需要) if file_path.suffix.lower() == '.pdf': pdf_bytes = convert_pdf_bytes_to_bytes_by_pypdfium2( pdf_bytes, start_page_id, end_page_id ) # 准备环境 (创建输出目录) local_md_dir = Path(output_dir).resolve() local_image_dir = local_md_dir / "images" image_writer = FileBasedDataWriter(local_image_dir.as_posix()) md_writer = FileBasedDataWriter(local_md_dir.as_posix()) # 使用 VLM 分析文档 (核心调用) middle_json, model_output = vlm_doc_analyze( pdf_bytes, image_writer=image_writer, backend=self.backend, server_url=self.server_url ) pdf_info = middle_json["pdf_info"] # 处理输出 output_files = self._process_output( pdf_info=pdf_info, pdf_bytes=pdf_bytes, pdf_file_name=pdf_file_name, local_md_dir=local_md_dir, local_image_dir=local_image_dir, md_writer=md_writer, middle_json=middle_json, model_output=model_output, original_file_path=str(file_path) ) # 统计提取信息 extraction_stats = self._get_extraction_stats(middle_json) return { "success": True, "pdf_info": pdf_info, "middle_json": middle_json, "model_output": model_output, "output_files": output_files, "extraction_stats": extraction_stats } except Exception as e: logger.error(f"Failed to process {file_path}: {e}") if self.debug: traceback.print_exc() return { "success": False, "error": str(e) } def _process_output(self, pdf_info, pdf_bytes, pdf_file_name, local_md_dir, local_image_dir, md_writer, middle_json, model_output, original_file_path: str) -> Dict[str, str]: """ 处理输出文件 Args: pdf_info: PDF信息 pdf_bytes: PDF字节数据 pdf_file_name: PDF文件名 local_md_dir: Markdown目录 local_image_dir: 图片目录 md_writer: Markdown写入器 middle_json: 中间JSON数据 model_output: 模型输出 original_file_path: 原始文件路径 Returns: dict: 保存的文件路径信息 """ saved_files = {} try: # 设置相对图片目录名 image_dir = str(os.path.basename(local_image_dir)) # 1. 生成并保存 Markdown 文件 md_content_str = vlm_union_make(pdf_info, MakeMode.MM_MD, image_dir) # 数字标准化处理 if self.normalize_numbers: original_md = md_content_str md_content_str = normalize_markdown_table(md_content_str) changes_count = len([1 for o, n in zip(original_md, md_content_str) if o != n]) if changes_count > 0: saved_files['md_normalized'] = f"✅ 已标准化 {changes_count} 个字符(全角→半角)" else: saved_files['md_normalized'] = "ℹ️ 无需标准化(已是标准格式)" md_writer.write_string(f"{pdf_file_name}.md", md_content_str) saved_files['md'] = os.path.join(local_md_dir, f"{pdf_file_name}.md") # 2. 生成并保存 content_list JSON 文件 content_list = vlm_union_make(pdf_info, MakeMode.CONTENT_LIST, image_dir) content_list_str = json.dumps(content_list, ensure_ascii=False, indent=2) md_writer.write_string(f"{pdf_file_name}_original.json", content_list_str) # 转换bbox坐标(从1000-based到像素坐标) if pdf_info and len(pdf_info) > 0: page_width, page_height = pdf_info[0].get('page_size', [1000, 1000]) for element in content_list: if "bbox" in element: x0, y0, x1, y1 = element["bbox"] element["bbox"] = [ int(x0 / 1000 * page_width), int(y0 / 1000 * page_height), int(x1 / 1000 * page_width), int(y1 / 1000 * page_height), ] content_list_str = json.dumps(content_list, ensure_ascii=False, indent=2) # 数字标准化处理 if self.normalize_numbers: original_json = content_list_str content_list_str = normalize_json_table(content_list_str) changes_count = len([1 for o, n in zip(original_json, content_list_str) if o != n]) if changes_count > 0: saved_files['json_normalized'] = f"✅ 已标准化 {changes_count} 个字符(全角→半角)" else: saved_files['json_normalized'] = "ℹ️ 无需标准化(已是标准格式)" md_writer.write_string(f"{pdf_file_name}.json", content_list_str) saved_files['json'] = os.path.join(local_md_dir, f"{pdf_file_name}.json") # 绘制布局边界框 try: draw_layout_bbox(pdf_info, pdf_bytes, local_md_dir, f"{pdf_file_name}_layout.pdf") saved_files['layout_pdf'] = os.path.join(local_md_dir, f"{pdf_file_name}_layout.pdf") except Exception as e: logger.warning(f"Failed to draw layout bbox: {e}") # 调试模式下保存额外信息 if self.debug: # 保存 middle.json middle_json_str = json.dumps(middle_json, ensure_ascii=False, indent=2) if self.normalize_numbers: middle_json_str = normalize_json_table(middle_json_str) md_writer.write_string(f"{pdf_file_name}_middle.json", middle_json_str) saved_files['middle_json'] = os.path.join(local_md_dir, f"{pdf_file_name}_middle.json") # 保存 model output if model_output: model_output_str = json.dumps(model_output, ensure_ascii=False, indent=2) md_writer.write_string(f"{pdf_file_name}_model.json", model_output_str) saved_files['model_output'] = os.path.join(local_md_dir, f"{pdf_file_name}_model.json") logger.info(f"Output saved to: {local_md_dir}") except Exception as e: logger.error(f"Error in _process_output: {e}") if self.debug: traceback.print_exc() return saved_files def _get_extraction_stats(self, middle_json: Dict) -> Dict[str, Any]: """ 获取提取统计信息 Args: middle_json: 中间JSON数据 Returns: dict: 统计信息 """ stats = { "total_blocks": 0, "block_types": {}, "total_pages": 0 } try: pdf_info = middle_json.get("pdf_info", []) if isinstance(pdf_info, list): stats["total_pages"] = len(pdf_info) for page_info in pdf_info: para_blocks = page_info.get("para_blocks", []) stats["total_blocks"] += len(para_blocks) for block in para_blocks: block_type = block.get("type", "unknown") stats["block_types"][block_type] = stats["block_types"].get(block_type, 0) + 1 except Exception as e: logger.warning(f"Failed to get extraction stats: {e}") return stats def process_single_image(self, image_path: str, output_dir: str) -> Dict[str, Any]: """ 处理单张图片 Args: image_path: 图片路径 output_dir: 输出目录 Returns: dict: 处理结果,包含 success 字段(基于输出文件存在性判断) """ start_time = time.time() image_path_obj = Path(image_path) image_name = image_path_obj.stem # 判断是否为PDF页面(根据文件名模式) is_pdf_page = "_page_" in image_path_obj.name # 根据输入类型生成预期的输出文件名 if is_pdf_page: # PDF页面:文件名格式为 filename_page_001.png # 输出文件名:filename_page_001.md 和 filename_page_001.json expected_md_path = Path(output_dir) / f"{image_name}.md" expected_json_path = Path(output_dir) / f"{image_name}.json" else: # 普通图片:输出文件名:filename.md 和 filename.json expected_md_path = Path(output_dir) / f"{image_name}.md" expected_json_path = Path(output_dir) / f"{image_name}.json" result_info = { "image_path": image_path, "processing_time": 0, "success": False, "server": self.server_url, "error": None, "output_files": {}, "is_pdf_page": is_pdf_page, "extraction_stats": {} } try: # 检查输出文件是否已存在(成功判断标准) if expected_md_path.exists() and expected_json_path.exists(): result_info.update({ "success": True, "processing_time": 0, "output_files": { "md": str(expected_md_path), "json": str(expected_json_path) }, "skipped": True }) logger.info(f"✅ 文件已存在,跳过处理: {image_name}") return result_info # 使用 do_parse_single_file 处理 parse_result = self.do_parse_single_file(image_path, output_dir) # 处理完成后,再次检查输出文件是否存在(成功判断标准) if expected_md_path.exists() and expected_json_path.exists(): result_info.update({ "success": True, "output_files": parse_result.get("output_files", {}), "extraction_stats": parse_result.get("extraction_stats", {}) }) logger.info(f"✅ 处理成功: {image_name}") else: # 文件不存在,标记为失败 missing_files = [] if not expected_md_path.exists(): missing_files.append("md") if not expected_json_path.exists(): missing_files.append("json") result_info["error"] = f"输出文件不存在: {', '.join(missing_files)}" result_info["success"] = False logger.error(f"❌ 处理失败: {image_name} - {result_info['error']}") except Exception as e: result_info["error"] = str(e) result_info["success"] = False logger.error(f"Error processing {image_name}: {e}") if self.debug: traceback.print_exc() finally: result_info["processing_time"] = time.time() - start_time return result_info