UNIPipe.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. import json
  2. from loguru import logger
  3. from magic_pdf.dict2md.mkcontent import mk_universal_format, mk_mm_markdown
  4. from magic_pdf.dict2md.ocr_mkcontent import make_standard_format_with_para, ocr_mk_mm_markdown_with_para
  5. from magic_pdf.filter.pdf_classify_by_type import classify
  6. from magic_pdf.filter.pdf_meta_scan import pdf_meta_scan
  7. from magic_pdf.io.AbsReaderWriter import AbsReaderWriter
  8. from magic_pdf.io.DiskReaderWriter import DiskReaderWriter
  9. from magic_pdf.libs.commons import join_path
  10. from magic_pdf.libs.detect_language_from_model import get_language_from_model
  11. from magic_pdf.libs.drop_reason import DropReason
  12. from magic_pdf.libs.json_compressor import JsonCompressor
  13. from magic_pdf.user_api import parse_union_pdf, parse_ocr_pdf
  14. class UNIPipe:
  15. def __init__(self):
  16. pass
  17. @staticmethod
  18. def classify(pdf_bytes: bytes) -> str:
  19. """
  20. 根据pdf的元数据,判断是否是文本pdf,还是ocr pdf
  21. """
  22. pdf_meta = pdf_meta_scan(pdf_bytes)
  23. if pdf_meta.get("_need_drop", False): # 如果返回了需要丢弃的标志,则抛出异常
  24. raise Exception(f"pdf meta_scan need_drop,reason is {pdf_meta['_drop_reason']}")
  25. else:
  26. is_encrypted = pdf_meta["is_encrypted"]
  27. is_needs_password = pdf_meta["is_needs_password"]
  28. if is_encrypted or is_needs_password: # 加密的,需要密码的,没有页面的,都不处理
  29. raise Exception(f"pdf meta_scan need_drop,reason is {DropReason.ENCRYPTED}")
  30. else:
  31. is_text_pdf, results = classify(
  32. pdf_meta["total_page"],
  33. pdf_meta["page_width_pts"],
  34. pdf_meta["page_height_pts"],
  35. pdf_meta["image_info_per_page"],
  36. pdf_meta["text_len_per_page"],
  37. pdf_meta["imgs_per_page"],
  38. pdf_meta["text_layout_per_page"],
  39. )
  40. if is_text_pdf:
  41. return "txt"
  42. else:
  43. return "ocr"
  44. def parse(self, pdf_bytes: bytes, image_writer, jso_useful_key) -> dict:
  45. """
  46. 根据pdf类型,解析pdf
  47. """
  48. text_language = get_language_from_model(jso_useful_key['model_list'])
  49. allow_language = ["zh", "en"] # 允许的语言,目前只允许简中和英文的
  50. logger.info(f"pdf text_language is {text_language}")
  51. if text_language not in allow_language: # 如果语言不在允许的语言中,则drop
  52. raise Exception(f"pdf meta_scan need_drop,reason is {DropReason.NOT_ALLOW_LANGUAGE}")
  53. else:
  54. if jso_useful_key['_pdf_type'] == "txt":
  55. pdf_mid_data = parse_union_pdf(pdf_bytes, jso_useful_key['model_list'], image_writer)
  56. elif jso_useful_key['_pdf_type'] == "ocr":
  57. pdf_mid_data = parse_ocr_pdf(pdf_bytes, jso_useful_key['model_list'], image_writer)
  58. else:
  59. raise Exception(f"pdf type is not txt or ocr")
  60. return JsonCompressor.compress_json(pdf_mid_data)
  61. @staticmethod
  62. def mk_uni_format(pdf_mid_data: str, img_buket_path: str) -> list:
  63. """
  64. 根据pdf类型,生成统一格式content_list
  65. """
  66. pdf_mid_data = JsonCompressor.decompress_json(pdf_mid_data)
  67. parse_type = pdf_mid_data["_parse_type"]
  68. pdf_info_list = pdf_mid_data["pdf_info"]
  69. if parse_type == "txt":
  70. content_list = mk_universal_format(pdf_info_list, img_buket_path)
  71. elif parse_type == "ocr":
  72. content_list = make_standard_format_with_para(pdf_info_list, img_buket_path)
  73. return content_list
  74. @staticmethod
  75. def mk_markdown(pdf_mid_data: str, img_buket_path: str) -> list:
  76. """
  77. 根据pdf类型,markdown
  78. """
  79. pdf_mid_data = JsonCompressor.decompress_json(pdf_mid_data)
  80. parse_type = pdf_mid_data["_parse_type"]
  81. pdf_info_list = pdf_mid_data["pdf_info"]
  82. if parse_type == "txt":
  83. content_list = mk_universal_format(pdf_info_list, img_buket_path)
  84. md_content = mk_mm_markdown(content_list)
  85. elif parse_type == "ocr":
  86. md_content = ocr_mk_mm_markdown_with_para(pdf_info_list, img_buket_path)
  87. return md_content
  88. if __name__ == '__main__':
  89. # 测试
  90. # file_path = r"tmp/unittest/download-pdfs/数学新星网/edu_00001236.pdf"
  91. drw = DiskReaderWriter(r"D:/project/20231108code-clean")
  92. # pdf_bytes = drw.read(path=file_path, mode=AbsReaderWriter.MODE_BIN)
  93. # pdf_type = UNIPipe.classify(pdf_bytes)
  94. # logger.info(f"pdf_type is {pdf_type}")
  95. pdf_file_path = r"linshixuqiu\25536-00.pdf"
  96. model_file_path = r"linshixuqiu\25536-00.json"
  97. pdf_bytes = drw.read(path=pdf_file_path, mode=AbsReaderWriter.MODE_BIN)
  98. model_json_txt = drw.read(path=model_file_path, mode=AbsReaderWriter.MODE_TXT)
  99. pdf_type = UNIPipe.classify(pdf_bytes)
  100. logger.info(f"pdf_type is {pdf_type}")
  101. jso_useful_key = {
  102. "_pdf_type": pdf_type,
  103. "model_list": json.loads(model_json_txt),
  104. }
  105. pipe = UNIPipe()
  106. write_path = r"D:\project\20231108code-clean\linshixuqiu\25536-00"
  107. img_buket_path = "imgs"
  108. img_writer = DiskReaderWriter(join_path(write_path, img_buket_path))
  109. pdf_mid_data = pipe.parse(pdf_bytes, img_writer, jso_useful_key)
  110. md_content = pipe.mk_markdown(pdf_mid_data, "imgs")
  111. md_writer = DiskReaderWriter(write_path)
  112. md_writer.write(content=md_content, path="25536-00.md", mode=AbsReaderWriter.MODE_TXT)
  113. md_writer.write(content=json.dumps(JsonCompressor.decompress_json(pdf_mid_data), ensure_ascii=False, indent=4), path="25536-00.json", mode=AbsReaderWriter.MODE_TXT)