pipeline_ocr.bak 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. import sys
  2. import time
  3. from loguru import logger
  4. from magic_pdf.dict2md.ocr_mkcontent import ocr_mk_mm_markdown, ocr_mk_nlp_markdown_with_para, \
  5. ocr_mk_mm_markdown_with_para_and_pagination, ocr_mk_mm_markdown_with_para, ocr_mk_mm_standard_format, \
  6. make_standard_format_with_para
  7. from magic_pdf.libs.commons import join_path, s3_image_save_path, formatted_time
  8. from magic_pdf.libs.json_compressor import JsonCompressor
  9. from magic_pdf.pdf_parse_by_ocr import parse_pdf_by_ocr
  10. from magic_pdf.spark.base import get_data_source, exception_handler, get_pdf_bytes, get_bookname
  11. from magic_pdf.spark.s3 import get_s3_config
  12. def ocr_pdf_intermediate_dict_to_markdown(jso: dict, debug_mode=False) -> dict:
  13. if debug_mode:
  14. pass
  15. else: # 如果debug没开,则检测是否有needdrop字段
  16. if jso.get("_need_drop", False):
  17. book_name = join_path(get_data_source(jso), jso["file_id"])
  18. logger.info(f"book_name is:{book_name} need drop", file=sys.stderr)
  19. jso["dropped"] = True
  20. return jso
  21. try:
  22. pdf_intermediate_dict = jso["pdf_intermediate_dict"]
  23. # 将 pdf_intermediate_dict 解压
  24. pdf_intermediate_dict = JsonCompressor.decompress_json(pdf_intermediate_dict)
  25. markdown_content = ocr_mk_mm_markdown(pdf_intermediate_dict)
  26. jso["content"] = markdown_content
  27. logger.info(
  28. f"book_name is:{get_data_source(jso)}/{jso['file_id']},markdown content length is {len(markdown_content)}",
  29. file=sys.stderr,
  30. )
  31. # 把无用的信息清空
  32. jso["doc_layout_result"] = ""
  33. jso["pdf_intermediate_dict"] = ""
  34. jso["pdf_meta"] = ""
  35. except Exception as e:
  36. jso = exception_handler(jso, e)
  37. return jso
  38. def ocr_pdf_intermediate_dict_to_markdown_with_para(jso: dict, mode, debug_mode=False) -> dict:
  39. if debug_mode:
  40. pass
  41. else: # 如果debug没开,则检测是否有needdrop字段
  42. if jso.get("_need_drop", False):
  43. book_name = join_path(get_data_source(jso), jso["file_id"])
  44. logger.info(f"book_name is:{book_name} need drop", file=sys.stderr)
  45. jso["dropped"] = True
  46. return jso
  47. try:
  48. pdf_intermediate_dict = jso["pdf_intermediate_dict"]
  49. # 将 pdf_intermediate_dict 解压
  50. pdf_intermediate_dict = JsonCompressor.decompress_json(pdf_intermediate_dict)
  51. if mode == "mm":
  52. markdown_content = ocr_mk_mm_markdown_with_para(pdf_intermediate_dict)
  53. elif mode == "nlp":
  54. markdown_content = ocr_mk_nlp_markdown_with_para(pdf_intermediate_dict)
  55. jso["content"] = markdown_content
  56. logger.info(
  57. f"book_name is:{get_data_source(jso)}/{jso['file_id']},markdown content length is {len(markdown_content)}",
  58. file=sys.stderr,
  59. )
  60. # 把无用的信息清空
  61. jso["doc_layout_result"] = ""
  62. jso["pdf_intermediate_dict"] = ""
  63. jso["pdf_meta"] = ""
  64. except Exception as e:
  65. jso = exception_handler(jso, e)
  66. return jso
  67. def ocr_pdf_intermediate_dict_to_markdown_with_para_and_pagination(jso: dict, debug_mode=False) -> dict:
  68. if debug_mode:
  69. pass
  70. else: # 如果debug没开,则检测是否有needdrop字段
  71. if jso.get("_need_drop", False):
  72. book_name = join_path(get_data_source(jso), jso["file_id"])
  73. logger.info(f"book_name is:{book_name} need drop", file=sys.stderr)
  74. jso["dropped"] = True
  75. return jso
  76. try:
  77. pdf_intermediate_dict = jso["pdf_intermediate_dict"]
  78. # 将 pdf_intermediate_dict 解压
  79. pdf_intermediate_dict = JsonCompressor.decompress_json(pdf_intermediate_dict)
  80. markdown_content = ocr_mk_mm_markdown_with_para_and_pagination(pdf_intermediate_dict)
  81. jso["content"] = markdown_content
  82. logger.info(
  83. f"book_name is:{get_data_source(jso)}/{jso['file_id']},markdown content length is {len(markdown_content)}",
  84. file=sys.stderr,
  85. )
  86. # 把无用的信息清空
  87. # jso["doc_layout_result"] = ""
  88. jso["pdf_intermediate_dict"] = ""
  89. # jso["pdf_meta"] = ""
  90. except Exception as e:
  91. jso = exception_handler(jso, e)
  92. return jso
  93. def ocr_pdf_intermediate_dict_to_markdown_with_para_for_qa(
  94. jso: dict, debug_mode=False
  95. ) -> dict:
  96. if debug_mode:
  97. pass
  98. else: # 如果debug没开,则检测是否有needdrop字段
  99. if jso.get("_need_drop", False):
  100. book_name = join_path(get_data_source(jso), jso["file_id"])
  101. logger.info(f"book_name is:{book_name} need drop", file=sys.stderr)
  102. jso["dropped"] = True
  103. return jso
  104. try:
  105. pdf_intermediate_dict = jso["pdf_intermediate_dict"]
  106. # 将 pdf_intermediate_dict 解压
  107. pdf_intermediate_dict = JsonCompressor.decompress_json(pdf_intermediate_dict)
  108. markdown_content = ocr_mk_mm_markdown_with_para(pdf_intermediate_dict)
  109. jso["content_ocr"] = markdown_content
  110. logger.info(
  111. f"book_name is:{get_data_source(jso)}/{jso['file_id']},markdown content length is {len(markdown_content)}",
  112. file=sys.stderr,
  113. )
  114. # 把无用的信息清空
  115. jso["doc_layout_result"] = ""
  116. jso["pdf_intermediate_dict"] = ""
  117. jso["mid_json_ocr"] = pdf_intermediate_dict
  118. jso["pdf_meta"] = ""
  119. except Exception as e:
  120. jso = exception_handler(jso, e)
  121. return jso
  122. def ocr_pdf_intermediate_dict_to_standard_format(jso: dict, debug_mode=False) -> dict:
  123. if debug_mode:
  124. pass
  125. else: # 如果debug没开,则检测是否有needdrop字段
  126. if jso.get("_need_drop", False):
  127. book_name = join_path(get_data_source(jso), jso["file_id"])
  128. logger.info(f"book_name is:{book_name} need drop", file=sys.stderr)
  129. jso["dropped"] = True
  130. return jso
  131. try:
  132. pdf_intermediate_dict = jso["pdf_intermediate_dict"]
  133. # 将 pdf_intermediate_dict 解压
  134. pdf_intermediate_dict = JsonCompressor.decompress_json(pdf_intermediate_dict)
  135. standard_format = ocr_mk_mm_standard_format(pdf_intermediate_dict)
  136. jso["content_list"] = standard_format
  137. logger.info(
  138. f"book_name is:{get_data_source(jso)}/{jso['file_id']},content_list length is {len(standard_format)}",
  139. file=sys.stderr,
  140. )
  141. # 把无用的信息清空
  142. jso["doc_layout_result"] = ""
  143. jso["pdf_intermediate_dict"] = ""
  144. jso["pdf_meta"] = ""
  145. except Exception as e:
  146. jso = exception_handler(jso, e)
  147. return jso
  148. def ocr_pdf_intermediate_dict_to_standard_format_with_para(jso: dict, debug_mode=False) -> dict:
  149. if debug_mode:
  150. pass
  151. else: # 如果debug没开,则检测是否有needdrop字段
  152. if jso.get("_need_drop", False):
  153. book_name = join_path(get_data_source(jso), jso["file_id"])
  154. logger.info(f"book_name is:{book_name} need drop", file=sys.stderr)
  155. jso["dropped"] = True
  156. return jso
  157. try:
  158. pdf_intermediate_dict = jso["pdf_intermediate_dict"]
  159. # 将 pdf_intermediate_dict 解压
  160. pdf_intermediate_dict = JsonCompressor.decompress_json(pdf_intermediate_dict)
  161. standard_format = make_standard_format_with_para(pdf_intermediate_dict)
  162. jso["content_list"] = standard_format
  163. logger.info(
  164. f"book_name is:{get_data_source(jso)}/{jso['file_id']},content_list length is {len(standard_format)}",
  165. file=sys.stderr,
  166. )
  167. # 把无用的信息清空
  168. jso["doc_layout_result"] = ""
  169. jso["pdf_intermediate_dict"] = ""
  170. jso["pdf_meta"] = ""
  171. except Exception as e:
  172. jso = exception_handler(jso, e)
  173. return jso
  174. def ocr_parse_pdf_core(pdf_bytes, model_output_json_list, book_name, start_page_id=0, debug_mode=False):
  175. save_path = s3_image_save_path
  176. image_s3_config = get_s3_config(save_path)
  177. start_time = time.time() # 记录开始时间
  178. # 先打印一下book_name和解析开始的时间
  179. logger.info(
  180. f"book_name is:{book_name},start_time is:{formatted_time(start_time)}",
  181. file=sys.stderr,
  182. )
  183. pdf_info_dict = parse_pdf_by_ocr(
  184. pdf_bytes,
  185. model_output_json_list,
  186. save_path,
  187. book_name,
  188. pdf_model_profile=None,
  189. image_s3_config=image_s3_config,
  190. start_page_id=start_page_id,
  191. debug_mode=debug_mode,
  192. )
  193. end_time = time.time() # 记录完成时间
  194. parse_time = int(end_time - start_time) # 计算执行时间
  195. # 解析完成后打印一下book_name和耗时
  196. logger.info(
  197. f"book_name is:{book_name},end_time is:{formatted_time(end_time)},cost_time is:{parse_time}",
  198. file=sys.stderr,
  199. )
  200. return pdf_info_dict, parse_time
  201. # 专门用来跑被drop的pdf,跑完之后需要把need_drop字段置为false
  202. def ocr_dropped_parse_pdf(jso: dict, start_page_id=0, debug_mode=False) -> dict:
  203. if not jso.get("_need_drop", False):
  204. return jso
  205. else:
  206. try:
  207. pdf_bytes = get_pdf_bytes(jso)
  208. model_output_json_list = jso.get("doc_layout_result")
  209. book_name = get_bookname(jso)
  210. pdf_info_dict, parse_time = ocr_parse_pdf_core(
  211. pdf_bytes, model_output_json_list, book_name, start_page_id=start_page_id, debug_mode=debug_mode
  212. )
  213. jso["pdf_intermediate_dict"] = JsonCompressor.compress_json(pdf_info_dict)
  214. jso["parse_time"] = parse_time
  215. jso["_need_drop"] = False
  216. except Exception as e:
  217. jso = exception_handler(jso, e)
  218. return jso
  219. def ocr_parse_pdf(jso: dict, start_page_id=0, debug_mode=False) -> dict:
  220. # 检测debug开关
  221. if debug_mode:
  222. pass
  223. else: # 如果debug没开,则检测是否有needdrop字段
  224. if jso.get("_need_drop", False):
  225. return jso
  226. try:
  227. pdf_bytes = get_pdf_bytes(jso)
  228. model_output_json_list = jso.get("doc_layout_result")
  229. book_name = get_bookname(jso)
  230. pdf_info_dict, parse_time = ocr_parse_pdf_core(pdf_bytes, model_output_json_list, book_name,
  231. start_page_id=start_page_id, debug_mode=debug_mode)
  232. jso["pdf_intermediate_dict"] = JsonCompressor.compress_json(pdf_info_dict)
  233. jso["parse_time"] = parse_time
  234. except Exception as e:
  235. jso = exception_handler(jso, e)
  236. return jso