Преглед изворни кода

feat(新增PDF文档类型分类工具): 添加PDF文档类型分类功能,支持判断可提取文本或需OCR

zhch158_admin пре 1 недеља
родитељ
комит
8032c96d96
1 измењених фајлова са 198 додато и 0 уклоњено
  1. 198 0
      ocr_utils/pdf_classify.py

+ 198 - 0
ocr_utils/pdf_classify.py

@@ -0,0 +1,198 @@
+"""
+PDF 文档类型分类工具
+
+封装自 MinerU 项目 mineru/utils/pdf_classify.py,作为 ocr_platform 的自有实现。
+功能:判断 PDF 是否可直接提取文本(txt)或需要 OCR(ocr)。
+
+对外接口:
+    classify(pdf_bytes: bytes) -> str   # 'txt' 或 'ocr'
+
+说明:
+    classify() 始终使用本模块的自有实现,以保留对 MinerU 原版的定制修改
+    (例如 avg_chars >= chars_threshold*4 时跳过图像覆盖率检测,避免含全页水印
+    图的文字型 PDF 被误判为 'ocr')。
+
+    内部 helper 函数(get_avg_cleaned_chars_per_page / get_high_image_coverage_ratio
+    / extract_pages / detect_invalid_chars)优先复用 MinerU 原版,供需要直接调用
+    helper 的场景使用;_USING_MINERU_HELPERS 标识当前是否使用 MinerU helpers。
+"""
+
+import re
+from io import BytesIO
+
+import numpy as np
+from loguru import logger
+
+
+# ──────────────────────────────────────────────────────────────────────────────
+# Helper 函数:优先复用 MinerU 原版(逻辑未修改,保持一致即可)
+# ──────────────────────────────────────────────────────────────────────────────
+
+try:
+    from mineru.utils.pdf_classify import (
+        get_avg_cleaned_chars_per_page,
+        get_high_image_coverage_ratio,
+        extract_pages,
+        detect_invalid_chars,
+    )
+    _USING_MINERU_HELPERS = True
+
+except ImportError:
+    _USING_MINERU_HELPERS = False
+
+    def get_avg_cleaned_chars_per_page(pdf_doc, pages_to_check: int) -> float:
+        """计算前 pages_to_check 页的平均清理后字符数。"""
+        cleaned_total = 0
+        for i in range(pages_to_check):
+            page = pdf_doc[i]
+            text = page.get_textpage().get_text_bounded()
+            cleaned_total += len(re.sub(r'\s+', '', text))
+        return cleaned_total / pages_to_check
+
+    def get_high_image_coverage_ratio(sample_pdf_bytes: bytes, pages_to_check: int) -> float:
+        """
+        计算高图像覆盖率(>= 80%)的页面占比。
+        使用 pdfminer 遍历页面布局元素。
+        """
+        from pdfminer.pdfparser import PDFParser
+        from pdfminer.pdfdocument import PDFDocument
+        from pdfminer.pdfpage import PDFPage
+        from pdfminer.pdfinterp import PDFResourceManager, PDFPageInterpreter
+        from pdfminer.layout import LAParams, LTImage, LTFigure
+        from pdfminer.converter import PDFPageAggregator
+
+        pdf_stream = BytesIO(sample_pdf_bytes)
+        parser = PDFParser(pdf_stream)
+        document = PDFDocument(parser)
+
+        if not document.is_extractable:
+            return 1.0
+
+        rsrcmgr = PDFResourceManager()
+        laparams = LAParams(
+            line_overlap=0.5, char_margin=2.0, line_margin=0.5,
+            word_margin=0.1, boxes_flow=None, detect_vertical=False, all_texts=False,
+        )
+        device = PDFPageAggregator(rsrcmgr, laparams=laparams)
+        interpreter = PDFPageInterpreter(rsrcmgr, device)
+
+        high_coverage_pages = 0
+        page_count = 0
+
+        for page in PDFPage.create_pages(document):
+            if page_count >= pages_to_check:
+                break
+            interpreter.process_page(page)
+            layout = device.get_result()
+
+            page_area = layout.width * layout.height
+            image_area = sum(
+                el.width * el.height
+                for el in layout
+                if isinstance(el, (LTImage, LTFigure))
+            )
+            coverage = min(image_area / page_area, 1.0) if page_area > 0 else 0
+            if coverage >= 0.8:
+                high_coverage_pages += 1
+            page_count += 1
+
+        pdf_stream.close()
+        return 0.0 if page_count == 0 else high_coverage_pages / page_count
+
+    def extract_pages(src_pdf_bytes: bytes) -> bytes:
+        """从 PDF 字节数据随机提取最多 10 页,返回新的 PDF 字节数据。"""
+        import pypdfium2 as pdfium
+
+        pdf = pdfium.PdfDocument(src_pdf_bytes)
+        total_page = len(pdf)
+        if total_page == 0:
+            logger.warning("PDF 为空,返回空文档")
+            return b''
+
+        select_count = min(10, total_page)
+        page_indices = np.random.choice(total_page, select_count, replace=False).tolist()
+
+        sample_doc = pdfium.PdfDocument.new()
+        try:
+            sample_doc.import_pages(pdf, page_indices)
+            pdf.close()
+            buf = BytesIO()
+            sample_doc.save(buf)
+            return buf.getvalue()
+        except Exception as e:
+            pdf.close()
+            logger.exception(e)
+            return b''
+
+    def detect_invalid_chars(sample_pdf_bytes: bytes) -> bool:
+        """检测 PDF 中是否包含乱码字符((cid:xxx) 占比 > 5%)。"""
+        from pdfminer.high_level import extract_text
+        from pdfminer.layout import LAParams
+
+        laparams = LAParams(
+            line_overlap=0.5, char_margin=2.0, line_margin=0.5,
+            word_margin=0.1, boxes_flow=None, detect_vertical=False, all_texts=False,
+        )
+        text = extract_text(pdf_file=BytesIO(sample_pdf_bytes), laparams=laparams)
+        text = text.replace('\n', '')
+
+        cid_pattern = re.compile(r'\(cid:\d+\)')
+        matches = cid_pattern.findall(text)
+        cid_count = len(matches)
+        cid_len = sum(len(m) for m in matches)
+        text_len = len(text)
+
+        if text_len == 0:
+            return False
+        cid_radio = cid_count / (cid_count + text_len - cid_len)
+        return cid_radio > 0.05
+
+
+# ──────────────────────────────────────────────────────────────────────────────
+# classify:始终使用自有实现(包含对 MinerU 原版的定制修改)
+# ──────────────────────────────────────────────────────────────────────────────
+
+def classify(pdf_bytes: bytes) -> str:
+    """
+    判断 PDF 文件是可以直接提取文本还是需要 OCR。
+
+    与 MinerU 原版的差异(不修改上游代码):
+        检查图像覆盖率之前,若每页平均字符数已 >= chars_threshold * 4,
+        则视为确定的文字型 PDF,跳过覆盖率检测。
+        典型场景:含全页半透明水印图的银行流水文字 PDF,图像覆盖率接近 100%,
+        但每页有大量可提取文字,应分类为 'txt' 而非 'ocr'。
+
+    Returns:
+        'txt' — 可直接提取文本
+        'ocr' — 需要 OCR
+    """
+    import pypdfium2 as pdfium
+
+    sample_pdf_bytes = extract_pages(pdf_bytes)
+    pdf = pdfium.PdfDocument(sample_pdf_bytes)
+    try:
+        page_count = len(pdf)
+        if page_count == 0:
+            return 'ocr'
+
+        pages_to_check = min(page_count, 10)
+        chars_threshold = 50
+
+        avg_chars = get_avg_cleaned_chars_per_page(pdf, pages_to_check)
+        if avg_chars < chars_threshold or detect_invalid_chars(sample_pdf_bytes):
+            return 'ocr'
+
+        # 仅在文字数量处于"临界量"时以图像覆盖率辅助判断。
+        # 若文字数量已远超阈值(>= 4×),视为确定的文字型 PDF,
+        # 不受背景图(如水印)干扰,直接返回 'txt'。
+        if avg_chars < chars_threshold * 4 and get_high_image_coverage_ratio(sample_pdf_bytes, pages_to_check) >= 0.8:
+            return 'ocr'
+
+        return 'txt'
+
+    except Exception as e:
+        logger.error(f"判断 PDF 类型时出错: {e}")
+        return 'ocr'
+    finally:
+        pdf.close()
+