demo_test.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. import json
  2. import os
  3. import sys
  4. from pathlib import Path
  5. import click
  6. from magic_pdf.dict2md.mkcontent import mk_mm_markdown
  7. from magic_pdf.pipeline import (
  8. meta_scan,
  9. classify_by_type,
  10. parse_pdf,
  11. pdf_intermediate_dict_to_markdown,
  12. save_tables_to_s3,
  13. )
  14. from magic_pdf.libs.commons import join_path, read_file, json_dump_path
  15. from app.common.s3 import get_s3_config
  16. from loguru import logger
  17. local_json_path = "Z:/format.json"
  18. local_jsonl_path = "Z:/format.jsonl"
  19. def get_json_from_local_or_s3(book_name=None):
  20. if book_name is None:
  21. with open(local_json_path, "r", encoding="utf-8") as json_file:
  22. json_line = json_file.read()
  23. json_object = json.loads(json_line)
  24. else:
  25. # error_log_path & json_dump_path
  26. # 可配置从上述两个地址获取源json
  27. json_path = join_path(json_dump_path, book_name + ".json")
  28. s3_config = get_s3_config(json_path)
  29. file_content = read_file(json_path, s3_config)
  30. json_str = file_content.decode("utf-8")
  31. # logger.info(json_str)
  32. json_object = json.loads(json_str)
  33. return json_object
  34. def write_json_to_local(jso, book_name=None):
  35. if book_name is None:
  36. with open(local_json_path, "w", encoding="utf-8") as file:
  37. file.write(json.dumps(jso, ensure_ascii=False))
  38. else:
  39. pass
  40. def demo_parse_pdf(book_name=None, start_page_id=0, debug_mode=True):
  41. json_object = get_json_from_local_or_s3(book_name)
  42. jso = parse_pdf(json_object, start_page_id=start_page_id, debug_mode=debug_mode)
  43. logger.info(f"pdf_parse_time: {jso['parse_time']}")
  44. write_json_to_local(jso, book_name)
  45. jso_md = pdf_intermediate_dict_to_markdown(jso, debug_mode=debug_mode)
  46. content = jso_md.get("content_list")
  47. markdown_content = mk_mm_markdown(content)
  48. if book_name is not None:
  49. save_tmp_path = os.path.join(os.path.dirname(__file__), "../..", "tmp", "unittest", "md", book_name)
  50. uni_format_save_path = join_path(save_tmp_path, "book" + ".json")
  51. markdown_save_path = join_path(save_tmp_path, "book" + ".md")
  52. with open(uni_format_save_path, "w", encoding="utf-8") as f:
  53. f.write(json.dumps(content, ensure_ascii=False, indent=4))
  54. with open(markdown_save_path, "w", encoding="utf-8") as f:
  55. f.write(markdown_content)
  56. else:
  57. logger.info(json.dumps(content, ensure_ascii=False))
  58. def demo_save_tables(book_name=None, start_page_id=0, debug_mode=True):
  59. json_object = get_json_from_local_or_s3(book_name)
  60. jso = parse_pdf(json_object, start_page_id=start_page_id, debug_mode=debug_mode)
  61. logger.info(f"pdf_parse_time: {jso['parse_time']}")
  62. write_json_to_local(jso, book_name)
  63. save_tables_to_s3(jso, debug_mode=debug_mode)
  64. def demo_classify_by_type(book_name=None, debug_mode=True):
  65. json_object = get_json_from_local_or_s3(book_name)
  66. jso = classify_by_type(json_object, debug_mode=debug_mode)
  67. logger.info(json.dumps(jso, ensure_ascii=False))
  68. logger.info(f"classify_time: {jso['classify_time']}")
  69. write_json_to_local(jso, book_name)
  70. def demo_meta_scan(book_name=None, debug_mode=True):
  71. json_object = get_json_from_local_or_s3(book_name)
  72. # doc_layout_check=False
  73. jso = meta_scan(json_object, doc_layout_check=True)
  74. logger.info(json.dumps(jso, ensure_ascii=False))
  75. logger.info(f"meta_scan_time: {jso['meta_scan_time']}")
  76. write_json_to_local(jso, book_name)
  77. def demo_meta_scan_from_jsonl():
  78. with open(local_jsonl_path, "r", encoding="utf-8") as jsonl_file:
  79. for line in jsonl_file:
  80. jso = json.loads(line)
  81. jso = meta_scan(jso)
  82. logger.info(f"pdf_path: {jso['content']['pdf_path']}")
  83. logger.info(f"read_file_time: {jso['read_file_time']}")
  84. logger.info(f"meta_scan_time: {jso['meta_scan_time']}")
  85. def demo_test5():
  86. with open(local_json_path, "r", encoding="utf-8") as json_file:
  87. json_line = json_file.read()
  88. jso = json.loads(json_line)
  89. img_list_len = len(jso["content"]["image_info_per_page"])
  90. logger.info(f"img_list_len: {img_list_len}")
  91. def read_more_para_test_samples(type="scihub"):
  92. # 读取多段落测试样本
  93. curr_dir = Path(__file__).parent
  94. files_path = ""
  95. if type == "gift":
  96. relative_path = "../tests/assets/more_para_test_samples/gift_files.txt"
  97. files_path = os.path.join(curr_dir, relative_path)
  98. if type == "scihub":
  99. relative_path = "../tests/assets/more_para_test_samples/scihub_files.txt"
  100. files_path = os.path.join(curr_dir, relative_path)
  101. if type == "zlib":
  102. relative_path = "../tests/assets/more_para_test_samples/zlib_files.txt"
  103. files_path = os.path.join(curr_dir, relative_path)
  104. # check if file exists
  105. if not os.path.exists(files_path):
  106. print("File not exist!")
  107. sys.exit(0)
  108. with open(files_path, "r", encoding="utf-8") as f:
  109. lines = f.readlines()
  110. # print("lines", lines)
  111. return lines
  112. def batch_test_more_para(type="scihub"):
  113. # 批量测试多段落
  114. para_test_files = read_more_para_test_samples(type)
  115. for file in para_test_files:
  116. file = file.strip()
  117. print(file)
  118. demo_parse_pdf(book_name=file)
  119. @click.command()
  120. @click.option("--book-name", help="s3上pdf文件的路径")
  121. def main(book_name: str):
  122. demo_parse_pdf(book_name, start_page_id=0)
  123. if __name__ == "__main__":
  124. main()