""" PaddleX 统一处理器 支持多种 pipeline(PaddleOCR-VL 和 PP-StructureV3)的文档处理类 """ import os import time import traceback import warnings from pathlib import Path from typing import List, Dict, Any from loguru import logger # 抑制特定警告 warnings.filterwarnings("ignore", message="To copy construct from a tensor") warnings.filterwarnings("ignore", message="Setting `pad_token_id`") warnings.filterwarnings("ignore", category=UserWarning, module="paddlex") from paddlex import create_pipeline # 导入工具函数 import sys paddle_common_root = Path(__file__).parent if str(paddle_common_root) not in sys.path: sys.path.insert(0, str(paddle_common_root)) from .utils import ( convert_pruned_result_to_json, save_output_images, save_markdown_content ) # 导入适配器 from .adapters import ( apply_table_recognition_adapter, restore_original_function, apply_enhanced_doc_preprocessor, restore_paddlex_doc_preprocessor ) class PaddleXProcessor: """PaddleX 统一处理器,支持多种 pipeline""" def __init__(self, pipeline_name: str = "PP-StructureV3", device: str = "gpu:0", normalize_numbers: bool = True, use_enhanced_adapter: bool = True, log_level: str = "INFO", **kwargs): """ 初始化处理器 Args: pipeline_name: Pipeline 名称或配置文件路径 device: 设备字符串(如 'gpu:0', 'cpu') normalize_numbers: 是否标准化数字 use_enhanced_adapter: 是否使用增强适配器 log_level: 日志级别(DEBUG, INFO, WARNING, ERROR),当为 DEBUG 时会打印详细错误信息 **kwargs: 其他预测参数 """ self.pipeline_name = pipeline_name self.device = device self.normalize_numbers = normalize_numbers self.use_enhanced_adapter = use_enhanced_adapter self.log_level = log_level self.predict_kwargs = kwargs # 检测 pipeline 类型 self.is_paddleocr_vl = 'PaddleOCR-VL'.lower() in str(pipeline_name).lower() # 应用适配器 self.adapter_applied = False if use_enhanced_adapter: self.adapter_applied = apply_table_recognition_adapter() and apply_enhanced_doc_preprocessor() if self.adapter_applied: logger.info("🎯 Enhanced table recognition adapter activated and document preprocessor applied") else: logger.warning("⚠️ Failed to apply adapter, using original implementation") # 初始化 pipeline self.pipeline = None self._initialize_pipeline() logger.info(f"PaddleX Processor 初始化完成:") logger.info(f" - Pipeline: {pipeline_name}") logger.info(f" - 设备: {device}") logger.info(f" - Pipeline 类型: {'PaddleOCR-VL' if self.is_paddleocr_vl else 'PP-StructureV3'}") logger.info(f" - 数字标准化: {normalize_numbers}") logger.info(f" - 增强适配器: {use_enhanced_adapter}") logger.info(f" - 日志级别: {log_level}") def _initialize_pipeline(self): """初始化 pipeline""" try: # 设置环境变量以减少警告 os.environ['PYTHONWARNINGS'] = 'ignore::UserWarning' logger.info(f"Initializing pipeline '{self.pipeline_name}' on device '{self.device}'...") self.pipeline = create_pipeline(self.pipeline_name, device=self.device) logger.info(f"Pipeline initialized successfully on {self.device}") except Exception as e: logger.error(f"Failed to initialize pipeline: {e}") if self.log_level == "DEBUG": traceback.print_exc() if self.adapter_applied: restore_original_function() restore_paddlex_doc_preprocessor() raise def _get_predict_kwargs(self) -> Dict[str, Any]: """根据 pipeline 类型获取预测参数""" if self.is_paddleocr_vl: # PaddleOCR-VL 使用驼峰命名 return { 'use_layout_detection': self.predict_kwargs.get('use_layout_detection', True), 'use_doc_orientation_classify': self.predict_kwargs.get('use_doc_orientation', True), 'use_doc_unwarping': self.predict_kwargs.get('use_doc_unwarping', False), } else: # PP-StructureV3 使用下划线命名 return { 'use_doc_orientation_classify': self.predict_kwargs.get('use_doc_orientation', True), 'use_doc_unwarping': self.predict_kwargs.get('use_doc_unwarping', False), 'use_layout_detection': self.predict_kwargs.get('use_layout_detection', True), 'use_seal_recognition': self.predict_kwargs.get('use_seal_recognition', True), 'use_table_recognition': self.predict_kwargs.get('use_table_recognition', True), 'use_formula_recognition': self.predict_kwargs.get('use_formula_recognition', False), 'use_chart_recognition': self.predict_kwargs.get('use_chart_recognition', True), 'use_ocr_results_with_table_cells': self.predict_kwargs.get('use_ocr_results_with_table_cells', True), 'use_table_orientation_classify': self.predict_kwargs.get('use_table_orientation_classify', False), 'use_wired_table_cells_trans_to_html': self.predict_kwargs.get('use_wired_table_cells_trans_to_html', True), 'use_wireless_table_cells_trans_to_html': self.predict_kwargs.get('use_wireless_table_cells_trans_to_html', True), } def process_single_image(self, image_path: str, output_dir: str) -> Dict[str, Any]: """ 处理单张图片 Args: image_path: 图片路径 output_dir: 输出目录 Returns: dict: 处理结果,包含 success 字段(基于输出文件存在性判断) """ start_time = time.time() image_path_obj = Path(image_path) image_name = image_path_obj.stem # 判断是否为PDF页面(根据文件名模式) is_pdf_page = "_page_" in image_path_obj.name result_info = { "image_path": image_path, "processing_time": 0, "success": False, "device": self.device, "error": None, "output_files": {}, "is_pdf_page": is_pdf_page, "processing_info": {} } try: if self.pipeline is None: raise Exception("Pipeline not initialized") # 准备预测参数 predict_kwargs = self._get_predict_kwargs() predict_kwargs['input'] = image_path # 使用 pipeline 预测 results = self.pipeline.predict(**predict_kwargs) # 处理结果(应该只有一个结果) # 使用迭代方式处理生成器,与原始实现保持一致 result = None for idx, res in enumerate(results): if idx > 0: raise ValueError("Multiple results found for a single image") result = res break # 只处理第一个结果 if result is None: raise Exception("No results returned from pipeline") input_path = Path(result["input_path"]) # 生成输出文件名 # 使用输入文件名(PaddleX 的 result["input_path"] 可能包含页面信息) output_filename = input_path.stem # 转换并保存标准JSON格式 json_content = result.json['res'] json_output_path, converted_json = convert_pruned_result_to_json( json_content, str(input_path), output_dir, output_filename, normalize_numbers=self.normalize_numbers ) # 保存输出图像 img_content = result.img saved_images = save_output_images(img_content, str(output_dir), output_filename) # 保存Markdown内容 markdown_content = result.markdown md_output_path = save_markdown_content( markdown_content, output_dir, output_filename, normalize_numbers=self.normalize_numbers, key_text='markdown_texts', key_images='markdown_images', json_data=converted_json ) # 根据实际保存的文件路径判断成功(成功判断标准:.md 和 .json 文件都存在) # 使用实际保存的文件路径 actual_md_path = Path(md_output_path) if md_output_path else Path(output_dir) / f"{output_filename}.md" actual_json_path = Path(json_output_path) if json_output_path else Path(output_dir) / f"{output_filename}.json" if actual_md_path.exists() and actual_json_path.exists(): result_info.update({ "success": True, "output_files": { "md": str(actual_md_path), "json": str(actual_json_path), **saved_images }, "processing_info": converted_json.get('processing_info', {}) }) logger.info(f"✅ 处理成功: {image_name}") else: # 文件不存在,标记为失败 missing_files = [] if not actual_md_path.exists(): missing_files.append("md") if not actual_json_path.exists(): missing_files.append("json") result_info["error"] = f"输出文件不存在: {', '.join(missing_files)}" result_info["success"] = False logger.error(f"❌ 处理失败: {image_name} - {result_info['error']}") except Exception as e: result_info["error"] = str(e) result_info["success"] = False logger.error(f"Error processing {image_name}: {e}") if self.log_level == "DEBUG": traceback.print_exc() finally: result_info["processing_time"] = time.time() - start_time return result_info def __del__(self): """清理资源""" if self.adapter_applied: try: restore_original_function() restore_paddlex_doc_preprocessor() logger.info("🔄 Original function restored") except Exception as e: logger.warning(f"Failed to restore original function: {e}")