UNIPipe.py 4.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. import json
  2. from loguru import logger
  3. from magic_pdf.libs.MakeContentConfig import DropMode, MakeMode
  4. from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze
  5. from magic_pdf.rw.AbsReaderWriter import AbsReaderWriter
  6. from magic_pdf.rw.DiskReaderWriter import DiskReaderWriter
  7. from magic_pdf.libs.commons import join_path
  8. from magic_pdf.pipe.AbsPipe import AbsPipe
  9. from magic_pdf.user_api import parse_union_pdf, parse_ocr_pdf
  10. class UNIPipe(AbsPipe):
  11. def __init__(self, pdf_bytes: bytes, jso_useful_key: dict, image_writer: AbsReaderWriter, is_debug: bool = False,
  12. start_page_id=0, end_page_id=None, lang=None):
  13. self.pdf_type = jso_useful_key["_pdf_type"]
  14. super().__init__(pdf_bytes, jso_useful_key["model_list"], image_writer, is_debug, start_page_id, end_page_id, lang)
  15. if len(self.model_list) == 0:
  16. self.input_model_is_empty = True
  17. else:
  18. self.input_model_is_empty = False
  19. def pipe_classify(self):
  20. self.pdf_type = AbsPipe.classify(self.pdf_bytes)
  21. def pipe_analyze(self):
  22. if self.pdf_type == self.PIP_TXT:
  23. self.model_list = doc_analyze(self.pdf_bytes, ocr=False,
  24. start_page_id=self.start_page_id, end_page_id=self.end_page_id,
  25. lang=self.lang)
  26. elif self.pdf_type == self.PIP_OCR:
  27. self.model_list = doc_analyze(self.pdf_bytes, ocr=True,
  28. start_page_id=self.start_page_id, end_page_id=self.end_page_id,
  29. lang=self.lang)
  30. def pipe_parse(self):
  31. if self.pdf_type == self.PIP_TXT:
  32. self.pdf_mid_data = parse_union_pdf(self.pdf_bytes, self.model_list, self.image_writer,
  33. is_debug=self.is_debug, input_model_is_empty=self.input_model_is_empty,
  34. start_page_id=self.start_page_id, end_page_id=self.end_page_id,
  35. lang=self.lang)
  36. elif self.pdf_type == self.PIP_OCR:
  37. self.pdf_mid_data = parse_ocr_pdf(self.pdf_bytes, self.model_list, self.image_writer,
  38. is_debug=self.is_debug,
  39. start_page_id=self.start_page_id, end_page_id=self.end_page_id,
  40. lang=self.lang)
  41. def pipe_mk_uni_format(self, img_parent_path: str, drop_mode=DropMode.WHOLE_PDF):
  42. result = super().pipe_mk_uni_format(img_parent_path, drop_mode)
  43. logger.info("uni_pipe mk content list finished")
  44. return result
  45. def pipe_mk_markdown(self, img_parent_path: str, drop_mode=DropMode.WHOLE_PDF, md_make_mode=MakeMode.MM_MD):
  46. result = super().pipe_mk_markdown(img_parent_path, drop_mode, md_make_mode)
  47. logger.info(f"uni_pipe mk {md_make_mode} finished")
  48. return result
  49. if __name__ == '__main__':
  50. # 测试
  51. drw = DiskReaderWriter(r"D:/project/20231108code-clean")
  52. pdf_file_path = r"linshixuqiu\19983-00.pdf"
  53. model_file_path = r"linshixuqiu\19983-00.json"
  54. pdf_bytes = drw.read(pdf_file_path, AbsReaderWriter.MODE_BIN)
  55. model_json_txt = drw.read(model_file_path, AbsReaderWriter.MODE_TXT)
  56. model_list = json.loads(model_json_txt)
  57. write_path = r"D:\project\20231108code-clean\linshixuqiu\19983-00"
  58. img_bucket_path = "imgs"
  59. img_writer = DiskReaderWriter(join_path(write_path, img_bucket_path))
  60. # pdf_type = UNIPipe.classify(pdf_bytes)
  61. # jso_useful_key = {
  62. # "_pdf_type": pdf_type,
  63. # "model_list": model_list
  64. # }
  65. jso_useful_key = {
  66. "_pdf_type": "",
  67. "model_list": model_list
  68. }
  69. pipe = UNIPipe(pdf_bytes, jso_useful_key, img_writer)
  70. pipe.pipe_classify()
  71. pipe.pipe_parse()
  72. md_content = pipe.pipe_mk_markdown(img_bucket_path)
  73. content_list = pipe.pipe_mk_uni_format(img_bucket_path)
  74. md_writer = DiskReaderWriter(write_path)
  75. md_writer.write(md_content, "19983-00.md", AbsReaderWriter.MODE_TXT)
  76. md_writer.write(json.dumps(pipe.pdf_mid_data, ensure_ascii=False, indent=4), "19983-00.json",
  77. AbsReaderWriter.MODE_TXT)
  78. md_writer.write(str(content_list), "19983-00.txt", AbsReaderWriter.MODE_TXT)