text_demo.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. import json
  2. import os
  3. import sys
  4. from pathlib import Path
  5. import click
  6. from demo.demo_commons import get_json_from_local_or_s3, write_json_to_local, local_jsonl_path, local_json_path
  7. from magic_pdf.dict2md.mkcontent import mk_mm_markdown, mk_universal_format
  8. from magic_pdf.filter.pdf_classify_by_type import classify
  9. from magic_pdf.filter.pdf_meta_scan import pdf_meta_scan
  10. from magic_pdf.libs.commons import join_path, read_file
  11. from loguru import logger
  12. from magic_pdf.libs.config_reader import get_s3_config_dict
  13. from magic_pdf.pdf_parse_by_txt import parse_pdf_by_txt
  14. from magic_pdf.spark.base import get_data_source
  15. def demo_parse_pdf(book_name=None, start_page_id=0, debug_mode=True):
  16. json_object = get_json_from_local_or_s3(book_name)
  17. s3_pdf_path = json_object.get("file_location")
  18. s3_config = get_s3_config_dict(s3_pdf_path)
  19. pdf_bytes = read_file(s3_pdf_path, s3_config)
  20. model_output_json_list = json_object.get("doc_layout_result")
  21. data_source = get_data_source(json_object)
  22. file_id = json_object.get("file_id")
  23. junk_img_bojids = json_object["pdf_meta"]["junk_img_bojids"]
  24. save_path = ""
  25. pdf_info_dict = parse_pdf_by_txt(
  26. pdf_bytes,
  27. model_output_json_list,
  28. save_path,
  29. f"{data_source}/{file_id}",
  30. pdf_model_profile=None,
  31. start_page_id=start_page_id,
  32. junk_img_bojids=junk_img_bojids,
  33. debug_mode=debug_mode,
  34. )
  35. write_json_to_local(pdf_info_dict, book_name)
  36. content_list = mk_universal_format(pdf_info_dict)
  37. markdown_content = mk_mm_markdown(content_list)
  38. if book_name is not None:
  39. save_tmp_path = os.path.join(os.path.dirname(__file__), "../..", "tmp", "unittest", "md", book_name)
  40. uni_format_save_path = join_path(save_tmp_path, "book" + ".json")
  41. markdown_save_path = join_path(save_tmp_path, "book" + ".md")
  42. with open(uni_format_save_path, "w", encoding="utf-8") as f:
  43. f.write(json.dumps(content_list, ensure_ascii=False, indent=4))
  44. with open(markdown_save_path, "w", encoding="utf-8") as f:
  45. f.write(markdown_content)
  46. else:
  47. logger.info(json.dumps(content_list, ensure_ascii=False))
  48. def demo_classify_by_type(book_name=None, debug_mode=True):
  49. json_object = get_json_from_local_or_s3(book_name)
  50. pdf_meta = json_object.get("pdf_meta")
  51. total_page = pdf_meta["total_page"]
  52. page_width = pdf_meta["page_width_pts"]
  53. page_height = pdf_meta["page_height_pts"]
  54. img_sz_list = pdf_meta["image_info_per_page"]
  55. img_num_list = pdf_meta["imgs_per_page"]
  56. text_len_list = pdf_meta["text_len_per_page"]
  57. text_layout_list = pdf_meta["text_layout_per_page"]
  58. pdf_path = json_object.get("file_location")
  59. is_text_pdf, results = classify(
  60. pdf_path,
  61. total_page,
  62. page_width,
  63. page_height,
  64. img_sz_list,
  65. text_len_list,
  66. img_num_list,
  67. text_layout_list,
  68. )
  69. logger.info(f"is_text_pdf: {is_text_pdf}")
  70. logger.info(json.dumps(results, ensure_ascii=False))
  71. write_json_to_local(results, book_name)
  72. def demo_meta_scan(book_name=None, debug_mode=True):
  73. json_object = get_json_from_local_or_s3(book_name)
  74. s3_pdf_path = json_object.get("file_location")
  75. s3_config = get_s3_config_dict(s3_pdf_path)
  76. pdf_bytes = read_file(s3_pdf_path, s3_config)
  77. res = pdf_meta_scan(s3_pdf_path, pdf_bytes)
  78. logger.info(json.dumps(res, ensure_ascii=False))
  79. write_json_to_local(res, book_name)
  80. def demo_test5():
  81. with open(local_json_path, "r", encoding="utf-8") as json_file:
  82. json_line = json_file.read()
  83. jso = json.loads(json_line)
  84. img_list_len = len(jso["content"]["image_info_per_page"])
  85. logger.info(f"img_list_len: {img_list_len}")
  86. def read_more_para_test_samples(type="scihub"):
  87. # 读取多段落测试样本
  88. curr_dir = Path(__file__).parent
  89. files_path = ""
  90. if type == "gift":
  91. relative_path = "../tests/assets/more_para_test_samples/gift_files.txt"
  92. files_path = os.path.join(curr_dir, relative_path)
  93. if type == "scihub":
  94. relative_path = "../tests/assets/more_para_test_samples/scihub_files.txt"
  95. files_path = os.path.join(curr_dir, relative_path)
  96. if type == "zlib":
  97. relative_path = "../tests/assets/more_para_test_samples/zlib_files.txt"
  98. files_path = os.path.join(curr_dir, relative_path)
  99. # check if file exists
  100. if not os.path.exists(files_path):
  101. print("File not exist!")
  102. sys.exit(0)
  103. with open(files_path, "r", encoding="utf-8") as f:
  104. lines = f.readlines()
  105. # print("lines", lines)
  106. return lines
  107. def batch_test_more_para(type="scihub"):
  108. # 批量测试多段落
  109. para_test_files = read_more_para_test_samples(type)
  110. for file in para_test_files:
  111. file = file.strip()
  112. print(file)
  113. demo_parse_pdf(book_name=file)
  114. @click.command()
  115. @click.option("--book-name", help="s3上pdf文件的路径")
  116. def main(book_name: str):
  117. demo_parse_pdf(book_name, start_page_id=0)
  118. if __name__ == "__main__":
  119. main()