UNIPipe.py 4.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. import json
  2. from loguru import logger
  3. from magic_pdf.data.data_reader_writer import DataWriter
  4. from magic_pdf.libs.commons import join_path
  5. from magic_pdf.libs.MakeContentConfig import DropMode, MakeMode
  6. from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze
  7. from magic_pdf.pipe.AbsPipe import AbsPipe
  8. from magic_pdf.user_api import parse_ocr_pdf, parse_union_pdf
  9. class UNIPipe(AbsPipe):
  10. def __init__(self, pdf_bytes: bytes, jso_useful_key: dict, image_writer: DataWriter, is_debug: bool = False,
  11. start_page_id=0, end_page_id=None, lang=None,
  12. layout_model=None, formula_enable=None, table_enable=None):
  13. self.pdf_type = jso_useful_key['_pdf_type']
  14. super().__init__(pdf_bytes, jso_useful_key['model_list'], image_writer, is_debug, start_page_id, end_page_id,
  15. lang, layout_model, formula_enable, table_enable)
  16. if len(self.model_list) == 0:
  17. self.input_model_is_empty = True
  18. else:
  19. self.input_model_is_empty = False
  20. def pipe_classify(self):
  21. self.pdf_type = AbsPipe.classify(self.pdf_bytes)
  22. def pipe_analyze(self):
  23. if self.pdf_type == self.PIP_TXT:
  24. self.model_list = doc_analyze(self.pdf_bytes, ocr=False,
  25. start_page_id=self.start_page_id, end_page_id=self.end_page_id,
  26. lang=self.lang, layout_model=self.layout_model,
  27. formula_enable=self.formula_enable, table_enable=self.table_enable)
  28. elif self.pdf_type == self.PIP_OCR:
  29. self.model_list = doc_analyze(self.pdf_bytes, ocr=True,
  30. start_page_id=self.start_page_id, end_page_id=self.end_page_id,
  31. lang=self.lang, layout_model=self.layout_model,
  32. formula_enable=self.formula_enable, table_enable=self.table_enable)
  33. def pipe_parse(self):
  34. if self.pdf_type == self.PIP_TXT:
  35. self.pdf_mid_data = parse_union_pdf(self.pdf_bytes, self.model_list, self.image_writer,
  36. is_debug=self.is_debug, input_model_is_empty=self.input_model_is_empty,
  37. start_page_id=self.start_page_id, end_page_id=self.end_page_id,
  38. lang=self.lang, layout_model=self.layout_model,
  39. formula_enable=self.formula_enable, table_enable=self.table_enable)
  40. elif self.pdf_type == self.PIP_OCR:
  41. self.pdf_mid_data = parse_ocr_pdf(self.pdf_bytes, self.model_list, self.image_writer,
  42. is_debug=self.is_debug,
  43. start_page_id=self.start_page_id, end_page_id=self.end_page_id,
  44. lang=self.lang)
  45. def pipe_mk_uni_format(self, img_parent_path: str, drop_mode=DropMode.NONE_WITH_REASON):
  46. result = super().pipe_mk_uni_format(img_parent_path, drop_mode)
  47. logger.info('uni_pipe mk content list finished')
  48. return result
  49. def pipe_mk_markdown(self, img_parent_path: str, drop_mode=DropMode.WHOLE_PDF, md_make_mode=MakeMode.MM_MD):
  50. result = super().pipe_mk_markdown(img_parent_path, drop_mode, md_make_mode)
  51. logger.info(f'uni_pipe mk {md_make_mode} finished')
  52. return result
  53. if __name__ == '__main__':
  54. # 测试
  55. from magic_pdf.data.data_reader_writer import DataReader
  56. drw = DataReader(r'D:/project/20231108code-clean')
  57. pdf_file_path = r'linshixuqiu\19983-00.pdf'
  58. model_file_path = r'linshixuqiu\19983-00.json'
  59. pdf_bytes = drw.read(pdf_file_path)
  60. model_json_txt = drw.read(model_file_path).decode()
  61. model_list = json.loads(model_json_txt)
  62. write_path = r'D:\project\20231108code-clean\linshixuqiu\19983-00'
  63. img_bucket_path = 'imgs'
  64. img_writer = DataWriter(join_path(write_path, img_bucket_path))
  65. # pdf_type = UNIPipe.classify(pdf_bytes)
  66. # jso_useful_key = {
  67. # "_pdf_type": pdf_type,
  68. # "model_list": model_list
  69. # }
  70. jso_useful_key = {
  71. '_pdf_type': '',
  72. 'model_list': model_list
  73. }
  74. pipe = UNIPipe(pdf_bytes, jso_useful_key, img_writer)
  75. pipe.pipe_classify()
  76. pipe.pipe_parse()
  77. md_content = pipe.pipe_mk_markdown(img_bucket_path)
  78. content_list = pipe.pipe_mk_uni_format(img_bucket_path)
  79. md_writer = DataWriter(write_path)
  80. md_writer.write_string('19983-00.md', md_content)
  81. md_writer.write_string('19983-00.json', json.dumps(pipe.pdf_mid_data, ensure_ascii=False, indent=4))
  82. md_writer.write_string('19983-00.txt', str(content_list))