pdf_check.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. import fitz
  2. import numpy as np
  3. from loguru import logger
  4. import re
  5. from io import BytesIO
  6. from pdfminer.high_level import extract_text
  7. from pdfminer.layout import LAParams
  8. def calculate_sample_count(total_page: int):
  9. """
  10. 根据总页数和采样率计算采样页面的数量。
  11. """
  12. select_page_cnt = min(10, total_page)
  13. return select_page_cnt
  14. def extract_pages(src_pdf_bytes: bytes) -> fitz.Document:
  15. pdf_docs = fitz.open("pdf", src_pdf_bytes)
  16. total_page = len(pdf_docs)
  17. if total_page == 0:
  18. # 如果PDF没有页面,直接返回空文档
  19. logger.warning("PDF is empty, return empty document")
  20. return fitz.Document()
  21. select_page_cnt = calculate_sample_count(total_page)
  22. page_num = np.random.choice(total_page, select_page_cnt, replace=False)
  23. sample_docs = fitz.Document()
  24. try:
  25. for index in page_num:
  26. sample_docs.insert_pdf(pdf_docs, from_page=int(index), to_page=int(index))
  27. except Exception as e:
  28. logger.exception(e)
  29. return sample_docs
  30. def detect_invalid_chars(src_pdf_bytes: bytes) -> bool:
  31. """"
  32. 检测PDF中是否包含非法字符
  33. """
  34. '''pdfminer比较慢,需要先随机抽取10页左右的sample'''
  35. sample_docs = extract_pages(src_pdf_bytes)
  36. sample_pdf_bytes = sample_docs.tobytes()
  37. sample_pdf_file_like_object = BytesIO(sample_pdf_bytes)
  38. laparams = LAParams(
  39. line_overlap=0.5,
  40. char_margin=2.0,
  41. line_margin=0.5,
  42. word_margin=0.1,
  43. boxes_flow=None,
  44. detect_vertical=False,
  45. all_texts=False,
  46. )
  47. text = extract_text(pdf_file=sample_pdf_file_like_object, laparams=laparams)
  48. text = text.replace("\n", "")
  49. # logger.info(text)
  50. '''乱码文本用pdfminer提取出来的文本特征是(cid:xxx)'''
  51. cid_pattern = re.compile(r'\(cid:\d+\)')
  52. matches = cid_pattern.findall(text)
  53. cid_count = len(matches)
  54. cid_len = sum(len(match) for match in matches)
  55. text_len = len(text)
  56. if text_len == 0:
  57. cid_chars_radio = 0
  58. else:
  59. cid_chars_radio = cid_count/(cid_count + text_len - cid_len)
  60. logger.info(f"cid_count: {cid_count}, text_len: {text_len}, cid_chars_radio: {cid_chars_radio}")
  61. '''当一篇文章存在5%以上的文本是乱码时,认为该文档为乱码文档'''
  62. if cid_chars_radio > 0.05:
  63. return False # 乱码文档
  64. else:
  65. return True # 正常文档
  66. def count_replacement_characters(text: str) -> int:
  67. """
  68. 统计字符串中 0xfffd 字符的数量。
  69. """
  70. return text.count('\ufffd')
  71. def detect_invalid_chars_by_pymupdf(src_pdf_bytes: bytes) -> bool:
  72. sample_docs = extract_pages(src_pdf_bytes)
  73. doc_text = ""
  74. for page in sample_docs:
  75. page_text = page.get_text('text', flags=fitz.TEXT_PRESERVE_WHITESPACE | fitz.TEXT_MEDIABOX_CLIP)
  76. doc_text += page_text
  77. text_len = len(doc_text)
  78. uffd_count = count_replacement_characters(doc_text)
  79. if text_len == 0:
  80. uffd_chars_radio = 0
  81. else:
  82. uffd_chars_radio = uffd_count / text_len
  83. logger.info(f"uffd_count: {uffd_count}, text_len: {text_len}, uffd_chars_radio: {uffd_chars_radio}")
  84. '''当一篇文章存在1%以上的文本是乱码时,认为该文档为乱码文档'''
  85. if uffd_chars_radio > 0.01:
  86. return False # 乱码文档
  87. else:
  88. return True # 正常文档