pdf_classify.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. """
  2. PDF 文档类型分类工具
  3. 封装自 MinerU 项目 mineru/utils/pdf_classify.py,作为 ocr_platform 的自有实现。
  4. 功能:判断 PDF 是否可直接提取文本(txt)或需要 OCR(ocr)。
  5. 对外接口:
  6. classify(pdf_bytes: bytes) -> str # 'txt' 或 'ocr'
  7. 说明:
  8. classify() 始终使用本模块的自有实现,以保留对 MinerU 原版的定制修改
  9. (例如 avg_chars >= chars_threshold*4 时跳过图像覆盖率检测,避免含全页水印
  10. 图的文字型 PDF 被误判为 'ocr')。
  11. 内部 helper 函数(get_avg_cleaned_chars_per_page / get_high_image_coverage_ratio
  12. / extract_pages / detect_invalid_chars)优先复用 MinerU 原版,供需要直接调用
  13. helper 的场景使用;_USING_MINERU_HELPERS 标识当前是否使用 MinerU helpers。
  14. """
  15. import re
  16. from io import BytesIO
  17. import numpy as np
  18. from loguru import logger
  19. # ──────────────────────────────────────────────────────────────────────────────
  20. # Helper 函数:优先复用 MinerU 原版(逻辑未修改,保持一致即可)
  21. # ──────────────────────────────────────────────────────────────────────────────
  22. try:
  23. from mineru.utils.pdf_classify import (
  24. get_avg_cleaned_chars_per_page,
  25. get_high_image_coverage_ratio,
  26. extract_pages,
  27. detect_invalid_chars,
  28. )
  29. _USING_MINERU_HELPERS = True
  30. except ImportError:
  31. _USING_MINERU_HELPERS = False
  32. def get_avg_cleaned_chars_per_page(pdf_doc, pages_to_check: int) -> float:
  33. """计算前 pages_to_check 页的平均清理后字符数。"""
  34. cleaned_total = 0
  35. for i in range(pages_to_check):
  36. page = pdf_doc[i]
  37. text = page.get_textpage().get_text_bounded()
  38. cleaned_total += len(re.sub(r'\s+', '', text))
  39. return cleaned_total / pages_to_check
  40. def get_high_image_coverage_ratio(sample_pdf_bytes: bytes, pages_to_check: int) -> float:
  41. """
  42. 计算高图像覆盖率(>= 80%)的页面占比。
  43. 使用 pdfminer 遍历页面布局元素。
  44. """
  45. from pdfminer.pdfparser import PDFParser
  46. from pdfminer.pdfdocument import PDFDocument
  47. from pdfminer.pdfpage import PDFPage
  48. from pdfminer.pdfinterp import PDFResourceManager, PDFPageInterpreter
  49. from pdfminer.layout import LAParams, LTImage, LTFigure
  50. from pdfminer.converter import PDFPageAggregator
  51. pdf_stream = BytesIO(sample_pdf_bytes)
  52. parser = PDFParser(pdf_stream)
  53. document = PDFDocument(parser)
  54. if not document.is_extractable:
  55. return 1.0
  56. rsrcmgr = PDFResourceManager()
  57. laparams = LAParams(
  58. line_overlap=0.5, char_margin=2.0, line_margin=0.5,
  59. word_margin=0.1, boxes_flow=None, detect_vertical=False, all_texts=False,
  60. )
  61. device = PDFPageAggregator(rsrcmgr, laparams=laparams)
  62. interpreter = PDFPageInterpreter(rsrcmgr, device)
  63. high_coverage_pages = 0
  64. page_count = 0
  65. for page in PDFPage.create_pages(document):
  66. if page_count >= pages_to_check:
  67. break
  68. interpreter.process_page(page)
  69. layout = device.get_result()
  70. page_area = layout.width * layout.height
  71. image_area = sum(
  72. el.width * el.height
  73. for el in layout
  74. if isinstance(el, (LTImage, LTFigure))
  75. )
  76. coverage = min(image_area / page_area, 1.0) if page_area > 0 else 0
  77. if coverage >= 0.8:
  78. high_coverage_pages += 1
  79. page_count += 1
  80. pdf_stream.close()
  81. return 0.0 if page_count == 0 else high_coverage_pages / page_count
  82. def extract_pages(src_pdf_bytes: bytes) -> bytes:
  83. """从 PDF 字节数据随机提取最多 10 页,返回新的 PDF 字节数据。"""
  84. import pypdfium2 as pdfium
  85. pdf = pdfium.PdfDocument(src_pdf_bytes)
  86. total_page = len(pdf)
  87. if total_page == 0:
  88. logger.warning("PDF 为空,返回空文档")
  89. return b''
  90. select_count = min(10, total_page)
  91. page_indices = np.random.choice(total_page, select_count, replace=False).tolist()
  92. sample_doc = pdfium.PdfDocument.new()
  93. try:
  94. sample_doc.import_pages(pdf, page_indices)
  95. pdf.close()
  96. buf = BytesIO()
  97. sample_doc.save(buf)
  98. return buf.getvalue()
  99. except Exception as e:
  100. pdf.close()
  101. logger.exception(e)
  102. return b''
  103. def detect_invalid_chars(sample_pdf_bytes: bytes) -> bool:
  104. """检测 PDF 中是否包含乱码字符((cid:xxx) 占比 > 5%)。"""
  105. from pdfminer.high_level import extract_text
  106. from pdfminer.layout import LAParams
  107. laparams = LAParams(
  108. line_overlap=0.5, char_margin=2.0, line_margin=0.5,
  109. word_margin=0.1, boxes_flow=None, detect_vertical=False, all_texts=False,
  110. )
  111. text = extract_text(pdf_file=BytesIO(sample_pdf_bytes), laparams=laparams)
  112. text = text.replace('\n', '')
  113. cid_pattern = re.compile(r'\(cid:\d+\)')
  114. matches = cid_pattern.findall(text)
  115. cid_count = len(matches)
  116. cid_len = sum(len(m) for m in matches)
  117. text_len = len(text)
  118. if text_len == 0:
  119. return False
  120. cid_radio = cid_count / (cid_count + text_len - cid_len)
  121. return cid_radio > 0.05
  122. # ──────────────────────────────────────────────────────────────────────────────
  123. # classify:始终使用自有实现(包含对 MinerU 原版的定制修改)
  124. # ──────────────────────────────────────────────────────────────────────────────
  125. def classify(pdf_bytes: bytes) -> str:
  126. """
  127. 判断 PDF 文件是可以直接提取文本还是需要 OCR。
  128. 与 MinerU 原版的差异(不修改上游代码):
  129. 检查图像覆盖率之前,若每页平均字符数已 >= chars_threshold * 4,
  130. 则视为确定的文字型 PDF,跳过覆盖率检测。
  131. 典型场景:含全页半透明水印图的银行流水文字 PDF,图像覆盖率接近 100%,
  132. 但每页有大量可提取文字,应分类为 'txt' 而非 'ocr'。
  133. Returns:
  134. 'txt' — 可直接提取文本
  135. 'ocr' — 需要 OCR
  136. """
  137. import pypdfium2 as pdfium
  138. sample_pdf_bytes = extract_pages(pdf_bytes)
  139. pdf = pdfium.PdfDocument(sample_pdf_bytes)
  140. try:
  141. page_count = len(pdf)
  142. if page_count == 0:
  143. return 'ocr'
  144. pages_to_check = min(page_count, 10)
  145. chars_threshold = 50
  146. avg_chars = get_avg_cleaned_chars_per_page(pdf, pages_to_check)
  147. if avg_chars < chars_threshold or detect_invalid_chars(sample_pdf_bytes):
  148. return 'ocr'
  149. # 仅在文字数量处于"临界量"时以图像覆盖率辅助判断。
  150. # 若文字数量已远超阈值(>= 4×),视为确定的文字型 PDF,
  151. # 不受背景图(如水印)干扰,直接返回 'txt'。
  152. if avg_chars < chars_threshold * 4 and get_high_image_coverage_ratio(sample_pdf_bytes, pages_to_check) >= 0.8:
  153. return 'ocr'
  154. return 'txt'
  155. except Exception as e:
  156. logger.error(f"判断 PDF 类型时出错: {e}")
  157. return 'ocr'
  158. finally:
  159. pdf.close()