| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260 |
- # Copyright (c) Opendatalab. All rights reserved.
- import re
- from io import BytesIO
- import numpy as np
- import pypdfium2 as pdfium
- from loguru import logger
- from pdfminer.high_level import extract_text
- from pdfminer.pdfparser import PDFParser
- from pdfminer.pdfdocument import PDFDocument
- from pdfminer.pdfpage import PDFPage
- from pdfminer.pdfinterp import PDFResourceManager
- from pdfminer.pdfinterp import PDFPageInterpreter
- from pdfminer.layout import LAParams, LTImage, LTFigure
- from pdfminer.converter import PDFPageAggregator
- def classify(pdf_bytes):
- """
- 判断PDF文件是可以直接提取文本还是需要OCR
- Args:
- pdf_bytes: PDF文件的字节数据
- Returns:
- str: 'txt' 表示可以直接提取文本,'ocr' 表示需要OCR
- """
- try:
- # 从字节数据加载PDF
- sample_pdf_bytes = extract_pages(pdf_bytes)
- pdf = pdfium.PdfDocument(sample_pdf_bytes)
- # 获取PDF页数
- page_count = len(pdf)
- # 如果PDF页数为0,直接返回OCR
- if page_count == 0:
- return 'ocr'
- # 检查的页面数(最多检查10页)
- pages_to_check = min(page_count, 10)
- # 设置阈值:如果每页平均少于50个有效字符,认为需要OCR
- chars_threshold = 50
- if (get_avg_cleaned_chars_per_page(pdf, pages_to_check) < chars_threshold) or detect_invalid_chars(sample_pdf_bytes):
- return 'ocr'
- else:
- if get_high_image_coverage_ratio(sample_pdf_bytes, pages_to_check) >= 0.8:
- return 'ocr'
- return 'txt'
- except Exception as e:
- logger.error(f"判断PDF类型时出错: {e}")
- # 出错时默认使用OCR
- return 'ocr'
- def get_avg_cleaned_chars_per_page(pdf_doc, pages_to_check):
- # 总字符数
- total_chars = 0
- # 清理后的总字符数
- cleaned_total_chars = 0
- # 检查前几页的文本
- for i in range(pages_to_check):
- page = pdf_doc[i]
- text_page = page.get_textpage()
- text = text_page.get_text_bounded()
- total_chars += len(text)
- # 清理提取的文本,移除空白字符
- cleaned_text = re.sub(r'\s+', '', text)
- cleaned_total_chars += len(cleaned_text)
- # 计算平均每页字符数
- avg_cleaned_chars_per_page = cleaned_total_chars / pages_to_check
- # logger.debug(f"PDF分析: 平均每页清理后{avg_cleaned_chars_per_page:.1f}字符")
- pdf_doc.close() # 关闭PDF文档
- return avg_cleaned_chars_per_page
- def get_high_image_coverage_ratio(sample_pdf_bytes, pages_to_check):
- # 创建内存文件对象
- pdf_stream = BytesIO(sample_pdf_bytes)
- # 创建PDF解析器
- parser = PDFParser(pdf_stream)
- # 创建PDF文档对象
- document = PDFDocument(parser)
- # 检查文档是否允许文本提取
- if not document.is_extractable:
- # logger.warning("PDF不允许内容提取")
- return 1.0 # 默认为高覆盖率,因为无法提取内容
- # 创建资源管理器和参数对象
- rsrcmgr = PDFResourceManager()
- laparams = LAParams(
- line_overlap=0.5,
- char_margin=2.0,
- line_margin=0.5,
- word_margin=0.1,
- boxes_flow=None,
- detect_vertical=False,
- all_texts=False,
- )
- # 创建聚合器
- device = PDFPageAggregator(rsrcmgr, laparams=laparams)
- # 创建解释器
- interpreter = PDFPageInterpreter(rsrcmgr, device)
- # 记录高图像覆盖率的页面数量
- high_image_coverage_pages = 0
- page_count = 0
- # 遍历页面
- for page in PDFPage.create_pages(document):
- # 控制检查的页数
- if page_count >= pages_to_check:
- break
- # 处理页面
- interpreter.process_page(page)
- layout = device.get_result()
- # 页面尺寸
- page_width = layout.width
- page_height = layout.height
- page_area = page_width * page_height
- # 计算图像覆盖的总面积
- image_area = 0
- # 遍历页面元素
- for element in layout:
- # 检查是否为图像或图形元素
- if isinstance(element, (LTImage, LTFigure)):
- # 计算图像边界框面积
- img_width = element.width
- img_height = element.height
- img_area = img_width * img_height
- image_area += img_area
- # 计算覆盖率
- coverage_ratio = min(image_area / page_area, 1.0) if page_area > 0 else 0
- # logger.debug(f"PDF分析: 页面 {page_count + 1} 图像覆盖率: {coverage_ratio:.2f}")
- # 判断是否为高覆盖率
- if coverage_ratio >= 0.8: # 使用80%作为高覆盖率的阈值
- high_image_coverage_pages += 1
- page_count += 1
- # 如果没有处理任何页面,返回0
- if page_count == 0:
- return 0.0
- # 计算高图像覆盖率的页面比例
- high_coverage_ratio = high_image_coverage_pages / page_count
- # logger.debug(f"PDF分析: 高图像覆盖页面比例: {high_coverage_ratio:.2f}")
- # 关闭资源
- pdf_stream.close()
- return high_coverage_ratio
- def extract_pages(src_pdf_bytes: bytes) -> bytes:
- """
- 从PDF字节数据中随机提取最多10页,返回新的PDF字节数据
- Args:
- src_pdf_bytes: PDF文件的字节数据
- Returns:
- bytes: 提取页面后的PDF字节数据
- """
- # 从字节数据加载PDF
- pdf = pdfium.PdfDocument(src_pdf_bytes)
- # 获取PDF页数
- total_page = len(pdf)
- if total_page == 0:
- # 如果PDF没有页面,直接返回空文档
- logger.warning("PDF is empty, return empty document")
- return b''
- # 选择最多10页
- select_page_cnt = min(10, total_page)
- # 从总页数中随机选择页面
- page_indices = np.random.choice(total_page, select_page_cnt, replace=False).tolist()
- # 创建一个新的PDF文档
- sample_docs = pdfium.PdfDocument.new()
- try:
- # 将选择的页面导入新文档
- sample_docs.import_pages(pdf, page_indices)
- # 将新PDF保存到内存缓冲区
- output_buffer = BytesIO()
- sample_docs.save(output_buffer)
- # 获取字节数据
- return output_buffer.getvalue()
- except Exception as e:
- logger.exception(e)
- return b'' # 出错时返回空字节
- def detect_invalid_chars(sample_pdf_bytes: bytes) -> bool:
- """"
- 检测PDF中是否包含非法字符
- """
- '''pdfminer比较慢,需要先随机抽取10页左右的sample'''
- # sample_pdf_bytes = extract_pages(src_pdf_bytes)
- sample_pdf_file_like_object = BytesIO(sample_pdf_bytes)
- laparams = LAParams(
- line_overlap=0.5,
- char_margin=2.0,
- line_margin=0.5,
- word_margin=0.1,
- boxes_flow=None,
- detect_vertical=False,
- all_texts=False,
- )
- text = extract_text(pdf_file=sample_pdf_file_like_object, laparams=laparams)
- text = text.replace("\n", "")
- # logger.info(text)
- '''乱码文本用pdfminer提取出来的文本特征是(cid:xxx)'''
- cid_pattern = re.compile(r'\(cid:\d+\)')
- matches = cid_pattern.findall(text)
- cid_count = len(matches)
- cid_len = sum(len(match) for match in matches)
- text_len = len(text)
- if text_len == 0:
- cid_chars_radio = 0
- else:
- cid_chars_radio = cid_count/(cid_count + text_len - cid_len)
- # logger.debug(f"cid_count: {cid_count}, text_len: {text_len}, cid_chars_radio: {cid_chars_radio}")
- '''当一篇文章存在5%以上的文本是乱码时,认为该文档为乱码文档'''
- if cid_chars_radio > 0.05:
- return True # 乱码文档
- else:
- return False # 正常文档
- if __name__ == '__main__':
- with open('/Users/myhloli/pdf/luanma2x10.pdf', 'rb') as f:
- p_bytes = f.read()
- logger.info(f"PDF分类结果: {classify(p_bytes)}")
|