spark_api.py 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. """
  2. 用户输入:
  3. model数组,每个元素代表一个页面
  4. pdf在s3的路径
  5. 截图保存的s3位置
  6. 然后:
  7. 1)根据s3路径,调用spark集群的api,拿到ak,sk,endpoint,构造出s3PDFReader
  8. 2)根据用户输入的s3地址,调用spark集群的api,拿到ak,sk,endpoint,构造出s3ImageWriter
  9. 其余部分至于构造s3cli, 获取ak,sk都在code-clean里写代码完成。不要反向依赖!!!
  10. """
  11. from loguru import logger
  12. from magic_pdf.io import AbsReaderWriter
  13. from magic_pdf.pdf_parse_by_ocr import parse_pdf_by_ocr
  14. from magic_pdf.pdf_parse_by_txt import parse_pdf_by_txt
  15. def parse_txt_pdf(pdf_bytes:bytes, pdf_models:list, imageWriter: AbsReaderWriter, is_debug=False, start_page=0, *args, **kwargs):
  16. """
  17. 解析文本类pdf
  18. """
  19. pdf_info_dict = parse_pdf_by_txt(
  20. pdf_bytes,
  21. pdf_models,
  22. imageWriter,
  23. start_page_id=start_page,
  24. debug_mode=is_debug,
  25. )
  26. pdf_info_dict["parse_type"] = "txt"
  27. return pdf_info_dict
  28. def parse_ocr_pdf(pdf_bytes:bytes, pdf_models:list, imageWriter: AbsReaderWriter, is_debug=False, start_page=0, *args, **kwargs):
  29. """
  30. 解析ocr类pdf
  31. """
  32. pdf_info_dict = parse_pdf_by_ocr(
  33. pdf_bytes,
  34. pdf_models,
  35. imageWriter,
  36. start_page_id=start_page,
  37. debug_mode=is_debug,
  38. )
  39. pdf_info_dict["_parse_type"] = "ocr"
  40. return pdf_info_dict
  41. def parse_union_pdf(pdf_bytes:bytes, pdf_models:list, imageWriter: AbsReaderWriter, is_debug=False, start_page=0, *args, **kwargs):
  42. """
  43. ocr和文本混合的pdf,全部解析出来
  44. """
  45. def parse_pdf(method):
  46. try:
  47. return method(
  48. pdf_bytes,
  49. pdf_models,
  50. imageWriter,
  51. start_page_id=start_page,
  52. debug_mode=is_debug,
  53. )
  54. except Exception as e:
  55. logger.error(f"{method.__name__} error: {e}")
  56. return None
  57. pdf_info_dict = parse_pdf(parse_pdf_by_txt)
  58. if pdf_info_dict is None or pdf_info_dict.get("_need_drop", False):
  59. logger.warning(f"parse_pdf_by_txt drop or error, switch to parse_pdf_by_ocr")
  60. pdf_info_dict = parse_pdf(parse_pdf_by_ocr)
  61. if pdf_info_dict is None:
  62. raise Exception("Both parse_pdf_by_txt and parse_pdf_by_ocr failed.")
  63. else:
  64. pdf_info_dict["_parse_type"] = "ocr"
  65. else:
  66. pdf_info_dict["_parse_type"] = "txt"
  67. return pdf_info_dict
  68. def spark_json_extractor(jso: dict) -> dict:
  69. """
  70. 从json中提取数据,返回一个dict
  71. """
  72. return {
  73. "_pdf_type": jso["_pdf_type"],
  74. "model_list": jso["doc_layout_result"],
  75. }