| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269 |
- """
- PaddleX 统一处理器
- 支持多种 pipeline(PaddleOCR-VL 和 PP-StructureV3)的文档处理类
- """
- import os
- import time
- import traceback
- import warnings
- from pathlib import Path
- from typing import List, Dict, Any
- from loguru import logger
- # 抑制特定警告
- warnings.filterwarnings("ignore", message="To copy construct from a tensor")
- warnings.filterwarnings("ignore", message="Setting `pad_token_id`")
- warnings.filterwarnings("ignore", category=UserWarning, module="paddlex")
- from paddlex import create_pipeline
- # 导入工具函数
- import sys
- paddle_common_root = Path(__file__).parent
- if str(paddle_common_root) not in sys.path:
- sys.path.insert(0, str(paddle_common_root))
- from .utils import (
- convert_pruned_result_to_json,
- save_output_images,
- save_markdown_content
- )
- # 导入适配器
- from .adapters import (
- apply_table_recognition_adapter,
- restore_original_function,
- apply_enhanced_doc_preprocessor,
- restore_paddlex_doc_preprocessor
- )
- class PaddleXProcessor:
- """PaddleX 统一处理器,支持多种 pipeline"""
-
- def __init__(self,
- pipeline_name: str = "PP-StructureV3",
- device: str = "gpu:0",
- normalize_numbers: bool = True,
- use_enhanced_adapter: bool = True,
- log_level: str = "INFO",
- **kwargs):
- """
- 初始化处理器
-
- Args:
- pipeline_name: Pipeline 名称或配置文件路径
- device: 设备字符串(如 'gpu:0', 'cpu')
- normalize_numbers: 是否标准化数字
- use_enhanced_adapter: 是否使用增强适配器
- log_level: 日志级别(DEBUG, INFO, WARNING, ERROR),当为 DEBUG 时会打印详细错误信息
- **kwargs: 其他预测参数
- """
- self.pipeline_name = pipeline_name
- self.device = device
- self.normalize_numbers = normalize_numbers
- self.use_enhanced_adapter = use_enhanced_adapter
- self.log_level = log_level
- self.predict_kwargs = kwargs
-
- # 检测 pipeline 类型
- self.is_paddleocr_vl = 'PaddleOCR-VL'.lower() in str(pipeline_name).lower()
-
- # 应用适配器
- self.adapter_applied = False
- if use_enhanced_adapter:
- self.adapter_applied = apply_table_recognition_adapter() and apply_enhanced_doc_preprocessor()
- if self.adapter_applied:
- logger.info("🎯 Enhanced table recognition adapter activated and document preprocessor applied")
- else:
- logger.warning("⚠️ Failed to apply adapter, using original implementation")
-
- # 初始化 pipeline
- self.pipeline = None
- self._initialize_pipeline()
-
- logger.info(f"PaddleX Processor 初始化完成:")
- logger.info(f" - Pipeline: {pipeline_name}")
- logger.info(f" - 设备: {device}")
- logger.info(f" - Pipeline 类型: {'PaddleOCR-VL' if self.is_paddleocr_vl else 'PP-StructureV3'}")
- logger.info(f" - 数字标准化: {normalize_numbers}")
- logger.info(f" - 增强适配器: {use_enhanced_adapter}")
- logger.info(f" - 日志级别: {log_level}")
-
- def _initialize_pipeline(self):
- """初始化 pipeline"""
- try:
- # 设置环境变量以减少警告
- os.environ['PYTHONWARNINGS'] = 'ignore::UserWarning'
-
- logger.info(f"Initializing pipeline '{self.pipeline_name}' on device '{self.device}'...")
- self.pipeline = create_pipeline(self.pipeline_name, device=self.device)
- logger.info(f"Pipeline initialized successfully on {self.device}")
-
- except Exception as e:
- logger.error(f"Failed to initialize pipeline: {e}")
- if self.log_level == "DEBUG":
- traceback.print_exc()
- if self.adapter_applied:
- restore_original_function()
- restore_paddlex_doc_preprocessor()
- raise
-
- def _get_predict_kwargs(self) -> Dict[str, Any]:
- """根据 pipeline 类型获取预测参数"""
- if self.is_paddleocr_vl:
- # PaddleOCR-VL 使用驼峰命名
- return {
- 'use_layout_detection': self.predict_kwargs.get('use_layout_detection', True),
- 'use_doc_orientation_classify': self.predict_kwargs.get('use_doc_orientation', True),
- 'use_doc_unwarping': self.predict_kwargs.get('use_doc_unwarping', False),
- }
- else:
- # PP-StructureV3 使用下划线命名
- return {
- 'use_doc_orientation_classify': self.predict_kwargs.get('use_doc_orientation', True),
- 'use_doc_unwarping': self.predict_kwargs.get('use_doc_unwarping', False),
- 'use_layout_detection': self.predict_kwargs.get('use_layout_detection', True),
- 'use_seal_recognition': self.predict_kwargs.get('use_seal_recognition', True),
- 'use_table_recognition': self.predict_kwargs.get('use_table_recognition', True),
- 'use_formula_recognition': self.predict_kwargs.get('use_formula_recognition', False),
- 'use_chart_recognition': self.predict_kwargs.get('use_chart_recognition', True),
- 'use_ocr_results_with_table_cells': self.predict_kwargs.get('use_ocr_results_with_table_cells', True),
- 'use_table_orientation_classify': self.predict_kwargs.get('use_table_orientation_classify', False),
- 'use_wired_table_cells_trans_to_html': self.predict_kwargs.get('use_wired_table_cells_trans_to_html', True),
- 'use_wireless_table_cells_trans_to_html': self.predict_kwargs.get('use_wireless_table_cells_trans_to_html', True),
- }
-
- def process_single_image(self, image_path: str, output_dir: str) -> Dict[str, Any]:
- """
- 处理单张图片
-
- Args:
- image_path: 图片路径
- output_dir: 输出目录
-
- Returns:
- dict: 处理结果,包含 success 字段(基于输出文件存在性判断)
- """
- start_time = time.time()
- image_path_obj = Path(image_path)
- image_name = image_path_obj.stem
-
- # 判断是否为PDF页面(根据文件名模式)
- is_pdf_page = "_page_" in image_path_obj.name
-
- result_info = {
- "image_path": image_path,
- "processing_time": 0,
- "success": False,
- "device": self.device,
- "error": None,
- "output_files": {},
- "is_pdf_page": is_pdf_page,
- "processing_info": {}
- }
-
- try:
- if self.pipeline is None:
- raise Exception("Pipeline not initialized")
-
- # 准备预测参数
- predict_kwargs = self._get_predict_kwargs()
- predict_kwargs['input'] = image_path
-
- # 使用 pipeline 预测
- results = self.pipeline.predict(**predict_kwargs)
-
- # 处理结果(应该只有一个结果)
- # 使用迭代方式处理生成器,与原始实现保持一致
- result = None
- for idx, res in enumerate(results):
- if idx > 0:
- raise ValueError("Multiple results found for a single image")
- result = res
- break # 只处理第一个结果
-
- if result is None:
- raise Exception("No results returned from pipeline")
- input_path = Path(result["input_path"])
-
- # 生成输出文件名
- # 使用输入文件名(PaddleX 的 result["input_path"] 可能包含页面信息)
- output_filename = input_path.stem
-
- # 转换并保存标准JSON格式
- json_content = result.json['res']
- json_output_path, converted_json = convert_pruned_result_to_json(
- json_content,
- str(input_path),
- output_dir,
- output_filename,
- normalize_numbers=self.normalize_numbers
- )
-
- # 保存输出图像
- img_content = result.img
- saved_images = save_output_images(img_content, str(output_dir), output_filename)
-
- # 保存Markdown内容
- markdown_content = result.markdown
- md_output_path = save_markdown_content(
- markdown_content,
- output_dir,
- output_filename,
- normalize_numbers=self.normalize_numbers,
- key_text='markdown_texts',
- key_images='markdown_images',
- json_data=converted_json
- )
-
- # 根据实际保存的文件路径判断成功(成功判断标准:.md 和 .json 文件都存在)
- # 使用实际保存的文件路径
- actual_md_path = Path(md_output_path) if md_output_path else Path(output_dir) / f"{output_filename}.md"
- actual_json_path = Path(json_output_path) if json_output_path else Path(output_dir) / f"{output_filename}.json"
-
- if actual_md_path.exists() and actual_json_path.exists():
- result_info.update({
- "success": True,
- "output_files": {
- "md": str(actual_md_path),
- "json": str(actual_json_path),
- **saved_images
- },
- "processing_info": converted_json.get('processing_info', {})
- })
- logger.info(f"✅ 处理成功: {image_name}")
- else:
- # 文件不存在,标记为失败
- missing_files = []
- if not actual_md_path.exists():
- missing_files.append("md")
- if not actual_json_path.exists():
- missing_files.append("json")
- result_info["error"] = f"输出文件不存在: {', '.join(missing_files)}"
- result_info["success"] = False
- logger.error(f"❌ 处理失败: {image_name} - {result_info['error']}")
-
- except Exception as e:
- result_info["error"] = str(e)
- result_info["success"] = False
- logger.error(f"Error processing {image_name}: {e}")
- if self.log_level == "DEBUG":
- traceback.print_exc()
-
- finally:
- result_info["processing_time"] = time.time() - start_time
-
- return result_info
-
- def __del__(self):
- """清理资源"""
- if self.adapter_applied:
- try:
- restore_original_function()
- restore_paddlex_doc_preprocessor()
- logger.info("🔄 Original function restored")
- except Exception as e:
- logger.warning(f"Failed to restore original function: {e}")
|