Browse Source

feat: 添加 PDF 文档类型检测功能,支持 pypdfium2 和 fitz 渲染引擎,优化文本提取过程

zhch158_admin 1 day ago
parent
commit
db1a81a141
1 changed files with 232 additions and 11 deletions
  1. 232 11
      ocr_utils/pdf_utils.py

+ 232 - 11
ocr_utils/pdf_utils.py

@@ -52,13 +52,41 @@ class PDFUtils:
             页面索引集合(0-based)
         """
         return parse_page_range(page_range, total_pages)
+
+    @staticmethod
+    def _detect_pdf_doc_type(pdf_doc: Any) -> str:
+        """
+        检测 PDF 文档对象类型
+        
+        Args:
+            pdf_doc: PDF 文档对象
+            
+        Returns:
+            'pypdfium2' 或 'fitz'
+        """
+        doc_type_name = type(pdf_doc).__name__
+        doc_module = type(pdf_doc).__module__
+        
+        if 'pdfium' in doc_module.lower() or 'PdfDocument' in doc_type_name:
+            return 'pypdfium2'
+        elif 'fitz' in doc_module.lower() or 'Document' in doc_type_name:
+            return 'fitz'
+        else:
+            # 尝试通过属性判断
+            if hasattr(pdf_doc, 'get_page') or hasattr(pdf_doc, 'page_count'):
+                # fitz.Document 有 page_count 属性
+                return 'fitz'
+            else:
+                # pypdfium2 通过索引访问
+                return 'pypdfium2'
     
     @staticmethod
     def load_and_classify_document(
         document_path: Path,
         dpi: int = 200,
-        page_range: Optional[str] = None
-    ) -> Tuple[List[Dict], str, Optional[Any]]:
+        page_range: Optional[str] = None,
+        renderer: str = "fitz"  # 新增参数,默认 fitz
+    ) -> Tuple[List[Dict], str, Optional[Any], str]:
         """
         加载文档并分类,支持页面范围过滤
         
@@ -68,12 +96,14 @@ class PDFUtils:
             page_range: 页面范围字符串,如 "1-5,7,9-12"
                        - PDF:按页码(从1开始)
                        - 图片目录:按文件名排序后的位置(从1开始)
+            renderer: PDF渲染引擎,"fitz" 或 "pypdfium2"
             
         Returns:
             (images_list, pdf_type, pdf_doc)
             - images_list: 图像列表,每个元素包含 {'img_pil': PIL.Image, 'scale': float, 'page_idx': int}
             - pdf_type: 'ocr' 或 'txt'
-            - pdf_doc: PDF文档对象(如果是PDF)
+            - pdf_doc: PDF文档对象(如果PDF)
+            - renderer_used: 实际使用的渲染器类型
         """
         pdf_doc = None
         pdf_type = 'ocr'  # 默认使用OCR模式
@@ -128,7 +158,7 @@ class PDFUtils:
                 pdf_bytes, 
                 dpi=dpi,
                 image_type=ImageType.PIL,
-                renderer='fitz'
+                renderer=renderer   # 使用指定的渲染引擎
             )
             
             # 解析页面范围
@@ -167,7 +197,7 @@ class PDFUtils:
         else:
             raise ValueError(f"Unsupported file format: {document_path.suffix}")
         
-        return all_images, pdf_type, pdf_doc
+        return all_images, pdf_type, pdf_doc, renderer
     
     @staticmethod
     def extract_text_from_pdf(
@@ -177,10 +207,10 @@ class PDFUtils:
         scale: float
     ) -> Tuple[str, bool]:
         """
-        从PDF直接提取文本(使用 MinerU 的 pypdfium2 方式
+        从PDF直接提取文本(支持 pypdfium2 和 fitz
         
         Args:
-            pdf_doc: pypdfium2 的 PdfDocument 对象
+            pdf_doc: PDF文档对象 (pypdfium2.PdfDocument 或 fitz.Document)
             page_idx: 页码索引
             bbox: 目标区域的bbox(图像坐标)
             scale: 图像与PDF的缩放比例
@@ -188,8 +218,24 @@ class PDFUtils:
         Returns:
             (text, success)
         """
+        # 检测 PDF 文档类型
+        doc_type = PDFUtils._detect_pdf_doc_type(pdf_doc)
+        
+        if doc_type == 'fitz':
+            return PDFUtils._extract_text_from_pdf_fitz(pdf_doc, page_idx, bbox, scale)
+        else:  # pypdfium2
+            return PDFUtils._extract_text_from_pdf_pypdfium2(pdf_doc, page_idx, bbox, scale)
+    
+    @staticmethod
+    def _extract_text_from_pdf_pypdfium2(
+        pdf_doc: Any,
+        page_idx: int,
+        bbox: List[float],
+        scale: float
+    ) -> Tuple[str, bool]:
+        """使用 pypdfium2 提取文本(原有实现)"""
         if not MINERU_AVAILABLE or pdf_get_page_text is None:
-            logger.debug("MinerU pdf_text_tool not available")
+            logger.error("MinerU pdf_text_tool not available")
             return "", False
             
         try:
@@ -212,13 +258,12 @@ class PDFUtils:
                 for line in block.get('lines', []):
                     line_bbox = line.get('bbox')
                     if line_bbox and hasattr(line_bbox, 'bbox'):
-                        line_bbox = line_bbox.bbox  # pdftext 的 BBox 对象
+                        line_bbox = line_bbox.bbox
                     elif isinstance(line_bbox, (list, tuple)) and len(line_bbox) >= 4:
                         line_bbox = list(line_bbox)
                     else:
                         continue
                     
-                    # 检查 line 是否与目标 bbox 重叠
                     if PDFUtils._bbox_overlap(pdf_bbox, line_bbox):
                         for span in line.get('spans', []):
                             span_text = span.get('text', '')
@@ -230,11 +275,187 @@ class PDFUtils:
             
         except Exception as e:
             import traceback
-            logger.debug(f"PDF text extraction error: {e}")
+            logger.debug(f"pypdfium2 text extraction error: {e}")
             logger.debug(traceback.format_exc())
             return "", False
     
     @staticmethod
+    def _extract_text_from_pdf_fitz(
+        pdf_doc: Any,
+        page_idx: int,
+        bbox: List[float],
+        scale: float
+    ) -> Tuple[str, bool]:
+        """使用 fitz 提取文本"""
+        try:
+            import fitz
+        except ImportError:
+            logger.error("PyMuPDF (fitz) not available")
+            return "", False
+        
+        try:
+            page = pdf_doc[page_idx]
+            
+            # 将图像坐标转换为PDF坐标
+            pdf_bbox = fitz.Rect(
+                bbox[0] / scale,
+                bbox[1] / scale,
+                bbox[2] / scale,
+                bbox[3] / scale
+            )
+            
+            # 提取区域内的文本
+            text = page.get_text("text", clip=pdf_bbox)
+            
+            return text.strip(), bool(text.strip())
+            
+        except Exception as e:
+            import traceback
+            logger.debug(f"fitz text extraction error: {e}")
+            logger.debug(traceback.format_exc())
+            return "", False
+    
+    @staticmethod
+    def extract_all_text_blocks(
+        pdf_doc: Any,
+        page_idx: int,
+        scale: float
+    ) -> List[Dict[str, Any]]:
+        """
+        提取页面所有文本块(支持 pypdfium2 和 fitz)
+        
+        Args:
+            pdf_doc: PDF文档对象
+            page_idx: 页码
+            scale: 缩放比例
+            
+        Returns:
+            文本块列表 [{'text': str, 'bbox': [x1, y1, x2, y2]}, ...]
+        """
+        # 检测 PDF 文档类型
+        doc_type = PDFUtils._detect_pdf_doc_type(pdf_doc)
+        
+        if doc_type == 'fitz':
+            return PDFUtils._extract_all_text_blocks_fitz(pdf_doc, page_idx, scale)
+        else:  # pypdfium2
+            return PDFUtils._extract_all_text_blocks_pypdfium2(pdf_doc, page_idx, scale)
+    
+    @staticmethod
+    def _extract_all_text_blocks_pypdfium2(
+        pdf_doc: Any,
+        page_idx: int,
+        scale: float
+    ) -> List[Dict[str, Any]]:
+        """使用 pypdfium2 提取所有文本块(原有实现)"""
+        if not MINERU_AVAILABLE or pdf_get_page_text is None:
+            return []
+            
+        try:
+            page = pdf_doc[page_idx]
+            page_dict = pdf_get_page_text(page)
+            
+            extracted_blocks = []
+            
+            for block in page_dict.get('blocks', []):
+                for line in block.get('lines', []):
+                    line_text = ""
+                    for span in line.get('spans', []):
+                        line_text += span.get('text', "")
+                    
+                    if not line_text.strip():
+                        continue
+                        
+                    line_bbox = line.get('bbox')
+                    if line_bbox and hasattr(line_bbox, 'bbox'):
+                        line_bbox = line_bbox.bbox
+                    elif isinstance(line_bbox, (list, tuple)) and len(line_bbox) >= 4:
+                        line_bbox = list(line_bbox)
+                    else:
+                        continue
+                        
+                    img_bbox = [
+                        line_bbox[0] * scale,
+                        line_bbox[1] * scale,
+                        line_bbox[2] * scale,
+                        line_bbox[3] * scale
+                    ]
+                    
+                    extracted_blocks.append({
+                        'text': line_text,
+                        'bbox': img_bbox,
+                        'origin_bbox': line_bbox
+                    })
+            
+            return extracted_blocks
+            
+        except Exception as e:
+            logger.warning(f"pypdfium2 extract_all_text_blocks failed: {e}")
+            import traceback
+            logger.debug(traceback.format_exc())
+            return []
+    
+    @staticmethod
+    def _extract_all_text_blocks_fitz(
+        pdf_doc: Any,
+        page_idx: int,
+        scale: float
+    ) -> List[Dict[str, Any]]:
+        """使用 fitz 提取所有文本块"""
+        try:
+            import fitz
+        except ImportError:
+            logger.warning("PyMuPDF (fitz) not available")
+            return []
+        
+        try:
+            page = pdf_doc[page_idx]
+            
+            # 使用 get_text("dict") 获取详细的文本信息
+            text_dict = page.get_text("dict")
+            
+            extracted_blocks = []
+            
+            # 遍历所有 blocks
+            for block in text_dict.get("blocks", []):
+                # 只处理文本块(type=0)
+                if block.get("type") != 0:
+                    continue
+                
+                # 遍历所有 lines
+                for line in block.get("lines", []):
+                    line_text = ""
+                    line_bbox = line.get("bbox")
+                    
+                    # 提取 line 中的所有 span 文本
+                    for span in line.get("spans", []):
+                        line_text += span.get("text", "")
+                    
+                    if not line_text.strip() or not line_bbox:
+                        continue
+                    
+                    # PDF 坐标转换为图像坐标
+                    img_bbox = [
+                        line_bbox[0] * scale,
+                        line_bbox[1] * scale,
+                        line_bbox[2] * scale,
+                        line_bbox[3] * scale
+                    ]
+                    
+                    extracted_blocks.append({
+                        'text': line_text,
+                        'bbox': img_bbox,
+                        'origin_bbox': list(line_bbox)
+                    })
+            
+            return extracted_blocks
+            
+        except Exception as e:
+            logger.warning(f"fitz extract_all_text_blocks failed: {e}")
+            import traceback
+            logger.debug(traceback.format_exc())
+            return []    
+
+    @staticmethod
     def _bbox_overlap(bbox1: List[float], bbox2: List[float]) -> bool:
         """检查两个 bbox 是否重叠"""
         if len(bbox1) < 4 or len(bbox2) < 4: