pipeline.bak 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360
  1. # coding=utf8
  2. import sys
  3. import time
  4. from urllib.parse import quote
  5. from magic_pdf.libs.commons import (
  6. read_file,
  7. join_path,
  8. parse_bucket_key,
  9. formatted_time,
  10. s3_image_save_path,
  11. )
  12. from magic_pdf.libs.drop_reason import DropReason
  13. from magic_pdf.libs.json_compressor import JsonCompressor
  14. from magic_pdf.dict2md.mkcontent import mk_universal_format
  15. from magic_pdf.pdf_parse_by_txt import parse_pdf_by_txt
  16. from magic_pdf.filter.pdf_classify_by_type import classify
  17. from magic_pdf.filter.pdf_meta_scan import pdf_meta_scan
  18. from loguru import logger
  19. from magic_pdf.pdf_parse_for_train import parse_pdf_for_train
  20. from magic_pdf.spark.base import exception_handler, get_data_source
  21. from magic_pdf.train_utils.convert_to_train_format import convert_to_train_format
  22. from magic_pdf.spark.s3 import get_s3_config
  23. def meta_scan(jso: dict, doc_layout_check=True) -> dict:
  24. s3_pdf_path = jso.get("file_location")
  25. s3_config = get_s3_config(s3_pdf_path)
  26. if doc_layout_check:
  27. if (
  28. "doc_layout_result" not in jso
  29. ): # 检测json中是存在模型数据,如果没有则需要跳过该pdf
  30. jso["_need_drop"] = True
  31. jso["_drop_reason"] = DropReason.MISS_DOC_LAYOUT_RESULT
  32. return jso
  33. try:
  34. data_source = get_data_source(jso)
  35. file_id = jso.get("file_id")
  36. book_name = f"{data_source}/{file_id}"
  37. # 首页存在超量drawing问题
  38. # special_pdf_list = ['zlib/zlib_21822650']
  39. # if book_name in special_pdf_list:
  40. # jso['need_drop'] = True
  41. # jso['drop_reason'] = DropReason.SPECIAL_PDF
  42. # return jso
  43. start_time = time.time() # 记录开始时间
  44. logger.info(
  45. f"book_name is:{book_name},start_time is:{formatted_time(start_time)}",
  46. file=sys.stderr,
  47. )
  48. file_content = read_file(s3_pdf_path, s3_config)
  49. read_file_time = int(time.time() - start_time) # 计算执行时间
  50. start_time = time.time() # 记录开始时间
  51. res = pdf_meta_scan(s3_pdf_path, file_content)
  52. if res.get(
  53. "_need_drop", False
  54. ): # 如果返回的字典里有need_drop,则提取drop_reason并跳过本次解析
  55. jso["_need_drop"] = True
  56. jso["_drop_reason"] = res["_drop_reason"]
  57. else: # 正常返回
  58. jso["pdf_meta"] = res
  59. jso["content"] = ""
  60. jso["remark"] = ""
  61. jso["data_url"] = ""
  62. end_time = time.time() # 记录结束时间
  63. meta_scan_time = int(end_time - start_time) # 计算执行时间
  64. logger.info(
  65. f"book_name is:{book_name},end_time is:{formatted_time(end_time)},read_file_time is:{read_file_time},meta_scan_time is:{meta_scan_time}",
  66. file=sys.stderr,
  67. )
  68. jso["read_file_time"] = read_file_time
  69. jso["meta_scan_time"] = meta_scan_time
  70. except Exception as e:
  71. jso = exception_handler(jso, e)
  72. return jso
  73. def classify_by_type(jso: dict, debug_mode=False) -> dict:
  74. # 检测debug开关
  75. if debug_mode:
  76. pass
  77. else: # 如果debug没开,则检测是否有needdrop字段
  78. if jso.get("_need_drop", False):
  79. return jso
  80. # 开始正式逻辑
  81. try:
  82. pdf_meta = jso.get("pdf_meta")
  83. data_source = get_data_source(jso)
  84. file_id = jso.get("file_id")
  85. book_name = f"{data_source}/{file_id}"
  86. total_page = pdf_meta["total_page"]
  87. page_width = pdf_meta["page_width_pts"]
  88. page_height = pdf_meta["page_height_pts"]
  89. img_sz_list = pdf_meta["image_info_per_page"]
  90. img_num_list = pdf_meta["imgs_per_page"]
  91. text_len_list = pdf_meta["text_len_per_page"]
  92. text_layout_list = pdf_meta["text_layout_per_page"]
  93. text_language = pdf_meta["text_language"]
  94. # allow_language = ['zh', 'en'] # 允许的语言,目前只允许简中和英文的
  95. # if text_language not in allow_language: # 如果语言不在允许的语言中,则drop
  96. # jso['need_drop'] = True
  97. # jso['drop_reason'] = DropReason.NOT_ALLOW_LANGUAGE
  98. # return jso
  99. pdf_path = pdf_meta["pdf_path"]
  100. is_encrypted = pdf_meta["is_encrypted"]
  101. is_needs_password = pdf_meta["is_needs_password"]
  102. if (
  103. is_encrypted or is_needs_password
  104. ): # 加密的,需要密码的,没有页面的,都不处理
  105. jso["_need_drop"] = True
  106. jso["_drop_reason"] = DropReason.ENCRYPTED
  107. else:
  108. start_time = time.time() # 记录开始时间
  109. is_text_pdf, results = classify(
  110. pdf_path,
  111. total_page,
  112. page_width,
  113. page_height,
  114. img_sz_list,
  115. text_len_list,
  116. img_num_list,
  117. text_layout_list,
  118. )
  119. classify_time = int(time.time() - start_time) # 计算执行时间
  120. if is_text_pdf:
  121. pdf_meta["is_text_pdf"] = is_text_pdf
  122. jso["_pdf_type"] = "TXT"
  123. jso["pdf_meta"] = pdf_meta
  124. jso["classify_time"] = classify_time
  125. # print(json.dumps(pdf_meta, ensure_ascii=False))
  126. allow_language = ["zh", "en"] # 允许的语言,目前只允许简中和英文的
  127. if (
  128. text_language not in allow_language
  129. ): # 如果语言不在允许的语言中,则drop
  130. jso["_need_drop"] = True
  131. jso["_drop_reason"] = DropReason.NOT_ALLOW_LANGUAGE
  132. return jso
  133. else:
  134. # 先不drop
  135. pdf_meta["is_text_pdf"] = is_text_pdf
  136. jso["_pdf_type"] = "OCR"
  137. jso["pdf_meta"] = pdf_meta
  138. jso["classify_time"] = classify_time
  139. # jso["_need_drop"] = True
  140. # jso["_drop_reason"] = DropReason.NOT_IS_TEXT_PDF
  141. extra_info = {"classify_rules": []}
  142. for condition, result in results.items():
  143. if not result:
  144. extra_info["classify_rules"].append(condition)
  145. jso["extra_info"] = extra_info
  146. except Exception as e:
  147. jso = exception_handler(jso, e)
  148. return jso
  149. def drop_needdrop_pdf(jso: dict) -> dict:
  150. if jso.get("_need_drop", False):
  151. logger.info(
  152. f"book_name is:{get_data_source(jso)}/{jso['file_id']} need drop",
  153. file=sys.stderr,
  154. )
  155. jso["dropped"] = True
  156. return jso
  157. def pdf_intermediate_dict_to_markdown(jso: dict, debug_mode=False) -> dict:
  158. if debug_mode:
  159. pass
  160. else: # 如果debug没开,则检测是否有needdrop字段
  161. if jso.get("_need_drop", False):
  162. book_name = join_path(get_data_source(jso), jso["file_id"])
  163. logger.info(f"book_name is:{book_name} need drop", file=sys.stderr)
  164. jso["dropped"] = True
  165. return jso
  166. try:
  167. pdf_intermediate_dict = jso["pdf_intermediate_dict"]
  168. # 将 pdf_intermediate_dict 解压
  169. pdf_intermediate_dict = JsonCompressor.decompress_json(pdf_intermediate_dict)
  170. # markdown_content = mk_nlp_markdown(pdf_intermediate_dict)
  171. jso["content_list"] = mk_universal_format(pdf_intermediate_dict)
  172. # jso["content"] = markdown_content
  173. logger.info(f"book_name is:{get_data_source(jso)}/{jso['file_id']}")
  174. # 把无用的信息清空
  175. jso["doc_layout_result"] = ""
  176. jso["pdf_intermediate_dict"] = ""
  177. jso["pdf_meta"] = ""
  178. except Exception as e:
  179. jso = exception_handler(jso, e)
  180. return jso
  181. def parse_pdf(jso: dict, start_page_id=0, debug_mode=False) -> dict:
  182. # 检测debug开关
  183. if debug_mode:
  184. pass
  185. else: # 如果debug没开,则检测是否有needdrop字段
  186. if jso.get("_need_drop", False):
  187. return jso
  188. # 开始正式逻辑
  189. s3_pdf_path = jso.get("file_location")
  190. s3_config = get_s3_config(s3_pdf_path)
  191. pdf_bytes = read_file(s3_pdf_path, s3_config)
  192. model_output_json_list = jso.get("doc_layout_result")
  193. data_source = get_data_source(jso)
  194. file_id = jso.get("file_id")
  195. book_name = f"{data_source}/{file_id}"
  196. junk_img_bojids = jso["pdf_meta"]["junk_img_bojids"]
  197. # 增加检测 max_svgs 数量的检测逻辑,如果 max_svgs 超过3000则drop
  198. svgs_per_page_list = jso["pdf_meta"]["svgs_per_page"]
  199. max_svgs = max(svgs_per_page_list)
  200. if max_svgs > 3000:
  201. jso["_need_drop"] = True
  202. jso["_drop_reason"] = DropReason.HIGH_COMPUTATIONAL_lOAD_BY_SVGS
  203. else:
  204. try:
  205. save_path = s3_image_save_path
  206. image_s3_config = get_s3_config(save_path)
  207. start_time = time.time() # 记录开始时间
  208. # 先打印一下book_name和解析开始的时间
  209. logger.info(
  210. f"book_name is:{book_name},start_time is:{formatted_time(start_time)}",
  211. file=sys.stderr,
  212. )
  213. pdf_info_dict = parse_pdf_by_txt(
  214. pdf_bytes,
  215. model_output_json_list,
  216. save_path,
  217. book_name,
  218. pdf_model_profile=None,
  219. image_s3_config=image_s3_config,
  220. start_page_id=start_page_id,
  221. junk_img_bojids=junk_img_bojids,
  222. debug_mode=debug_mode,
  223. )
  224. if pdf_info_dict.get(
  225. "_need_drop", False
  226. ): # 如果返回的字典里有need_drop,则提取drop_reason并跳过本次解析
  227. jso["_need_drop"] = True
  228. jso["_drop_reason"] = pdf_info_dict["_drop_reason"]
  229. else: # 正常返回,将 pdf_info_dict 压缩并存储
  230. pdf_info_dict = JsonCompressor.compress_json(pdf_info_dict)
  231. jso["pdf_intermediate_dict"] = pdf_info_dict
  232. end_time = time.time() # 记录完成时间
  233. parse_time = int(end_time - start_time) # 计算执行时间
  234. # 解析完成后打印一下book_name和耗时
  235. logger.info(
  236. f"book_name is:{book_name},end_time is:{formatted_time(end_time)},cost_time is:{parse_time}",
  237. file=sys.stderr,
  238. )
  239. jso["parse_time"] = parse_time
  240. except Exception as e:
  241. jso = exception_handler(jso, e)
  242. return jso
  243. def parse_pdf_for_model_train(jso: dict, start_page_id=0, debug_mode=False) -> dict:
  244. # 检测debug开关
  245. if debug_mode:
  246. pass
  247. else: # 如果debug没开,则检测是否有needdrop字段
  248. if jso.get("_need_drop", False):
  249. return jso
  250. # 开始正式逻辑
  251. s3_pdf_path = jso.get("file_location")
  252. s3_config = get_s3_config(s3_pdf_path)
  253. model_output_json_list = jso.get("doc_layout_result")
  254. data_source = get_data_source(jso)
  255. file_id = jso.get("file_id")
  256. book_name = f"{data_source}/{file_id}"
  257. # 1.23.22已修复
  258. # if debug_mode:
  259. # pass
  260. # else:
  261. # if book_name == "zlib/zlib_21929367":
  262. # jso['need_drop'] = True
  263. # jso['drop_reason'] = DropReason.SPECIAL_PDF
  264. # return jso
  265. junk_img_bojids = jso["pdf_meta"]["junk_img_bojids"]
  266. # total_page = jso['pdf_meta']['total_page']
  267. # 增加检测 max_svgs 数量的检测逻辑,如果 max_svgs 超过3000则drop
  268. svgs_per_page_list = jso["pdf_meta"]["svgs_per_page"]
  269. max_svgs = max(svgs_per_page_list)
  270. if max_svgs > 3000:
  271. jso["_need_drop"] = True
  272. jso["_drop_reason"] = DropReason.HIGH_COMPUTATIONAL_lOAD_BY_SVGS
  273. # elif total_page > 1000:
  274. # jso['need_drop'] = True
  275. # jso['drop_reason'] = DropReason.HIGH_COMPUTATIONAL_lOAD_BY_TOTAL_PAGES
  276. else:
  277. try:
  278. save_path = s3_image_save_path
  279. image_s3_config = get_s3_config(save_path)
  280. start_time = time.time() # 记录开始时间
  281. # 先打印一下book_name和解析开始的时间
  282. logger.info(
  283. f"book_name is:{book_name},start_time is:{formatted_time(start_time)}",
  284. file=sys.stderr,
  285. )
  286. pdf_info_dict = parse_pdf_for_train(
  287. s3_pdf_path,
  288. s3_config,
  289. model_output_json_list,
  290. save_path,
  291. book_name,
  292. pdf_model_profile=None,
  293. image_s3_config=image_s3_config,
  294. start_page_id=start_page_id,
  295. junk_img_bojids=junk_img_bojids,
  296. debug_mode=debug_mode,
  297. )
  298. if pdf_info_dict.get(
  299. "_need_drop", False
  300. ): # 如果返回的字典里有need_drop,则提取drop_reason并跳过本次解析
  301. jso["_need_drop"] = True
  302. jso["_drop_reason"] = pdf_info_dict["_drop_reason"]
  303. else: # 正常返回,将 pdf_info_dict 压缩并存储
  304. jso["parsed_results"] = convert_to_train_format(pdf_info_dict)
  305. pdf_info_dict = JsonCompressor.compress_json(pdf_info_dict)
  306. jso["pdf_intermediate_dict"] = pdf_info_dict
  307. end_time = time.time() # 记录完成时间
  308. parse_time = int(end_time - start_time) # 计算执行时间
  309. # 解析完成后打印一下book_name和耗时
  310. logger.info(
  311. f"book_name is:{book_name},end_time is:{formatted_time(end_time)},cost_time is:{parse_time}",
  312. file=sys.stderr,
  313. )
  314. jso["parse_time"] = parse_time
  315. except Exception as e:
  316. jso = exception_handler(jso, e)
  317. return jso
  318. """
  319. 统一处理逻辑
  320. 1.先调用parse_pdf对文本类pdf进行处理
  321. 2.再调用ocr_dropped_parse_pdf,对之前drop的pdf进行处理
  322. """
  323. # def uni_parse_pdf(jso: dict, start_page_id=0, debug_mode=False) -> dict:
  324. # jso = parse_pdf(jso, start_page_id=start_page_id, debug_mode=debug_mode)
  325. # jso = ocr_dropped_parse_pdf(jso, start_page_id=start_page_id, debug_mode=debug_mode)
  326. # return jso
  327. if __name__ == "__main__":
  328. pass