pipeline.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438
  1. # coding=utf8
  2. import sys
  3. import time
  4. from urllib.parse import quote
  5. from magic_pdf.libs.commons import (
  6. read_file,
  7. join_path,
  8. parse_bucket_key,
  9. formatted_time,
  10. s3_image_save_path,
  11. )
  12. from magic_pdf.libs.drop_reason import DropReason
  13. from magic_pdf.libs.json_compressor import JsonCompressor
  14. from magic_pdf.dict2md.mkcontent import mk_universal_format
  15. from magic_pdf.pdf_parse_by_model import parse_pdf_by_model
  16. from magic_pdf.filter.pdf_classify_by_type import classify
  17. from magic_pdf.filter.pdf_meta_scan import pdf_meta_scan
  18. from loguru import logger
  19. from magic_pdf.pdf_parse_for_train import parse_pdf_for_train
  20. from magic_pdf.spark import exception_handler, get_data_source
  21. from magic_pdf.train_utils.convert_to_train_format import convert_to_train_format
  22. from magic_pdf.spark import get_s3_config, get_s3_client
  23. def meta_scan(jso: dict, doc_layout_check=True) -> dict:
  24. s3_pdf_path = jso.get("file_location")
  25. s3_config = get_s3_config(s3_pdf_path)
  26. if doc_layout_check:
  27. if (
  28. "doc_layout_result" not in jso
  29. ): # 检测json中是存在模型数据,如果没有则需要跳过该pdf
  30. jso["need_drop"] = True
  31. jso["drop_reason"] = DropReason.MISS_DOC_LAYOUT_RESULT
  32. return jso
  33. try:
  34. data_source = get_data_source(jso)
  35. file_id = jso.get("file_id")
  36. book_name = f"{data_source}/{file_id}"
  37. # 首页存在超量drawing问题
  38. # special_pdf_list = ['zlib/zlib_21822650']
  39. # if book_name in special_pdf_list:
  40. # jso['need_drop'] = True
  41. # jso['drop_reason'] = DropReason.SPECIAL_PDF
  42. # return jso
  43. start_time = time.time() # 记录开始时间
  44. logger.info(
  45. f"book_name is:{book_name},start_time is:{formatted_time(start_time)}",
  46. file=sys.stderr,
  47. )
  48. file_content = read_file(s3_pdf_path, s3_config)
  49. read_file_time = int(time.time() - start_time) # 计算执行时间
  50. start_time = time.time() # 记录开始时间
  51. res = pdf_meta_scan(s3_pdf_path, file_content)
  52. if res.get(
  53. "need_drop", False
  54. ): # 如果返回的字典里有need_drop,则提取drop_reason并跳过本次解析
  55. jso["need_drop"] = True
  56. jso["drop_reason"] = res["drop_reason"]
  57. else: # 正常返回
  58. jso["pdf_meta"] = res
  59. jso["content"] = ""
  60. jso["remark"] = ""
  61. jso["data_url"] = ""
  62. end_time = time.time() # 记录结束时间
  63. meta_scan_time = int(end_time - start_time) # 计算执行时间
  64. logger.info(
  65. f"book_name is:{book_name},end_time is:{formatted_time(end_time)},read_file_time is:{read_file_time},meta_scan_time is:{meta_scan_time}",
  66. file=sys.stderr,
  67. )
  68. jso["read_file_time"] = read_file_time
  69. jso["meta_scan_time"] = meta_scan_time
  70. except Exception as e:
  71. jso = exception_handler(jso, e)
  72. return jso
  73. def classify_by_type(jso: dict, debug_mode=False) -> dict:
  74. # 检测debug开关
  75. if debug_mode:
  76. pass
  77. else: # 如果debug没开,则检测是否有needdrop字段
  78. if jso.get("need_drop", False):
  79. return jso
  80. # 开始正式逻辑
  81. try:
  82. pdf_meta = jso.get("pdf_meta")
  83. data_source = get_data_source(jso)
  84. file_id = jso.get("file_id")
  85. book_name = f"{data_source}/{file_id}"
  86. total_page = pdf_meta["total_page"]
  87. page_width = pdf_meta["page_width_pts"]
  88. page_height = pdf_meta["page_height_pts"]
  89. img_sz_list = pdf_meta["image_info_per_page"]
  90. img_num_list = pdf_meta["imgs_per_page"]
  91. text_len_list = pdf_meta["text_len_per_page"]
  92. text_layout_list = pdf_meta["text_layout_per_page"]
  93. text_language = pdf_meta["text_language"]
  94. # allow_language = ['zh', 'en'] # 允许的语言,目前只允许简中和英文的
  95. # if text_language not in allow_language: # 如果语言不在允许的语言中,则drop
  96. # jso['need_drop'] = True
  97. # jso['drop_reason'] = DropReason.NOT_ALLOW_LANGUAGE
  98. # return jso
  99. pdf_path = pdf_meta["pdf_path"]
  100. is_encrypted = pdf_meta["is_encrypted"]
  101. is_needs_password = pdf_meta["is_needs_password"]
  102. if (
  103. is_encrypted or is_needs_password
  104. ): # 加密的,需要密码的,没有页面的,都不处理
  105. jso["need_drop"] = True
  106. jso["drop_reason"] = DropReason.ENCRYPTED
  107. else:
  108. start_time = time.time() # 记录开始时间
  109. is_text_pdf, results = classify(
  110. pdf_path,
  111. total_page,
  112. page_width,
  113. page_height,
  114. img_sz_list,
  115. text_len_list,
  116. img_num_list,
  117. text_layout_list,
  118. )
  119. classify_time = int(time.time() - start_time) # 计算执行时间
  120. if is_text_pdf:
  121. pdf_meta["is_text_pdf"] = is_text_pdf
  122. jso["pdf_meta"] = pdf_meta
  123. jso["classify_time"] = classify_time
  124. # print(json.dumps(pdf_meta, ensure_ascii=False))
  125. allow_language = ["zh", "en"] # 允许的语言,目前只允许简中和英文的
  126. if (
  127. text_language not in allow_language
  128. ): # 如果语言不在允许的语言中,则drop
  129. jso["need_drop"] = True
  130. jso["drop_reason"] = DropReason.NOT_ALLOW_LANGUAGE
  131. return jso
  132. else:
  133. # 先不drop
  134. pdf_meta["is_text_pdf"] = is_text_pdf
  135. jso["pdf_meta"] = pdf_meta
  136. jso["classify_time"] = classify_time
  137. jso["need_drop"] = True
  138. jso["drop_reason"] = DropReason.NOT_IS_TEXT_PDF
  139. extra_info = {"classify_rules": []}
  140. for condition, result in results.items():
  141. if not result:
  142. extra_info["classify_rules"].append(condition)
  143. jso["extra_info"] = extra_info
  144. except Exception as e:
  145. jso = exception_handler(jso, e)
  146. return jso
  147. def save_tables_to_s3(jso: dict, debug_mode=False) -> dict:
  148. if debug_mode:
  149. pass
  150. else: # 如果debug没开,则检测是否有needdrop字段
  151. if jso.get("need_drop", False):
  152. logger.info(
  153. f"book_name is:{get_data_source(jso)}/{jso['file_id']} need drop",
  154. file=sys.stderr,
  155. )
  156. jso["dropped"] = True
  157. return jso
  158. try:
  159. data_source = get_data_source(jso)
  160. file_id = jso.get("file_id")
  161. book_name = f"{data_source}/{file_id}"
  162. title = jso.get("title")
  163. url_encode_title = quote(title, safe="")
  164. if data_source != "scihub":
  165. return jso
  166. pdf_intermediate_dict = jso["pdf_intermediate_dict"]
  167. # 将 pdf_intermediate_dict 解压
  168. pdf_intermediate_dict = JsonCompressor.decompress_json(pdf_intermediate_dict)
  169. i = 0
  170. for page in pdf_intermediate_dict.values():
  171. if page.get("tables"):
  172. if len(page["tables"]) > 0:
  173. j = 0
  174. for table in page["tables"]:
  175. if debug_mode:
  176. image_path = join_path(
  177. "s3://mllm-raw-media/pdf2md_img/",
  178. book_name,
  179. table["image_path"],
  180. )
  181. else:
  182. image_path = join_path(
  183. "s3://mllm-raw-media/pdf2md_img/", table["image_path"]
  184. )
  185. if image_path.endswith(".jpg"):
  186. j += 1
  187. s3_client = get_s3_client(image_path)
  188. bucket_name, bucket_key = parse_bucket_key(image_path)
  189. # 通过s3_client获取图片到内存
  190. image_bytes = s3_client.get_object(
  191. Bucket=bucket_name, Key=bucket_key
  192. )["Body"].read()
  193. # 保存图片到新的位置
  194. if debug_mode:
  195. new_image_path = join_path(
  196. "s3://mllm-raw-media/pdf2md_img/table_new/",
  197. url_encode_title
  198. + "_"
  199. + table["image_path"].lstrip("tables/"),
  200. )
  201. else:
  202. new_image_path = join_path(
  203. "s3://mllm-raw-media/pdf2md_img/table_new/",
  204. url_encode_title + f"_page{i}_{j}.jpg",
  205. )
  206. logger.info(new_image_path, file=sys.stderr)
  207. bucket_name, bucket_key = parse_bucket_key(new_image_path)
  208. s3_client.put_object(
  209. Bucket=bucket_name, Key=bucket_key, Body=image_bytes
  210. )
  211. else:
  212. continue
  213. i += 1
  214. # 把无用的信息清空
  215. jso["doc_layout_result"] = ""
  216. jso["pdf_intermediate_dict"] = ""
  217. jso["pdf_meta"] = ""
  218. except Exception as e:
  219. jso = exception_handler(jso, e)
  220. return jso
  221. def drop_needdrop_pdf(jso: dict) -> dict:
  222. if jso.get("need_drop", False):
  223. logger.info(
  224. f"book_name is:{get_data_source(jso)}/{jso['file_id']} need drop",
  225. file=sys.stderr,
  226. )
  227. jso["dropped"] = True
  228. return jso
  229. def pdf_intermediate_dict_to_markdown(jso: dict, debug_mode=False) -> dict:
  230. if debug_mode:
  231. pass
  232. else: # 如果debug没开,则检测是否有needdrop字段
  233. if jso.get("need_drop", False):
  234. book_name = join_path(get_data_source(jso), jso["file_id"])
  235. logger.info(f"book_name is:{book_name} need drop", file=sys.stderr)
  236. jso["dropped"] = True
  237. return jso
  238. try:
  239. pdf_intermediate_dict = jso["pdf_intermediate_dict"]
  240. # 将 pdf_intermediate_dict 解压
  241. pdf_intermediate_dict = JsonCompressor.decompress_json(pdf_intermediate_dict)
  242. # markdown_content = mk_nlp_markdown(pdf_intermediate_dict)
  243. jso["content_list"] = mk_universal_format(pdf_intermediate_dict)
  244. # jso["content"] = markdown_content
  245. logger.info(f"book_name is:{get_data_source(jso)}/{jso['file_id']}")
  246. # 把无用的信息清空
  247. jso["doc_layout_result"] = ""
  248. jso["pdf_intermediate_dict"] = ""
  249. jso["pdf_meta"] = ""
  250. except Exception as e:
  251. jso = exception_handler(jso, e)
  252. return jso
  253. def parse_pdf(jso: dict, start_page_id=0, debug_mode=False) -> dict:
  254. # 检测debug开关
  255. if debug_mode:
  256. pass
  257. else: # 如果debug没开,则检测是否有needdrop字段
  258. if jso.get("need_drop", False):
  259. return jso
  260. # 开始正式逻辑
  261. s3_pdf_path = jso.get("file_location")
  262. s3_config = get_s3_config(s3_pdf_path)
  263. pdf_bytes = read_file(s3_pdf_path, s3_config)
  264. model_output_json_list = jso.get("doc_layout_result")
  265. data_source = get_data_source(jso)
  266. file_id = jso.get("file_id")
  267. book_name = f"{data_source}/{file_id}"
  268. junk_img_bojids = jso["pdf_meta"]["junk_img_bojids"]
  269. # 增加检测 max_svgs 数量的检测逻辑,如果 max_svgs 超过3000则drop
  270. svgs_per_page_list = jso["pdf_meta"]["svgs_per_page"]
  271. max_svgs = max(svgs_per_page_list)
  272. if max_svgs > 3000:
  273. jso["need_drop"] = True
  274. jso["drop_reason"] = DropReason.HIGH_COMPUTATIONAL_lOAD_BY_SVGS
  275. else:
  276. try:
  277. save_path = s3_image_save_path
  278. image_s3_config = get_s3_config(save_path)
  279. start_time = time.time() # 记录开始时间
  280. # 先打印一下book_name和解析开始的时间
  281. logger.info(
  282. f"book_name is:{book_name},start_time is:{formatted_time(start_time)}",
  283. file=sys.stderr,
  284. )
  285. pdf_info_dict = parse_pdf_by_model(
  286. pdf_bytes,
  287. model_output_json_list,
  288. save_path,
  289. book_name,
  290. pdf_model_profile=None,
  291. image_s3_config=image_s3_config,
  292. start_page_id=start_page_id,
  293. junk_img_bojids=junk_img_bojids,
  294. debug_mode=debug_mode,
  295. )
  296. if pdf_info_dict.get(
  297. "need_drop", False
  298. ): # 如果返回的字典里有need_drop,则提取drop_reason并跳过本次解析
  299. jso["need_drop"] = True
  300. jso["drop_reason"] = pdf_info_dict["drop_reason"]
  301. else: # 正常返回,将 pdf_info_dict 压缩并存储
  302. pdf_info_dict = JsonCompressor.compress_json(pdf_info_dict)
  303. jso["pdf_intermediate_dict"] = pdf_info_dict
  304. end_time = time.time() # 记录完成时间
  305. parse_time = int(end_time - start_time) # 计算执行时间
  306. # 解析完成后打印一下book_name和耗时
  307. logger.info(
  308. f"book_name is:{book_name},end_time is:{formatted_time(end_time)},cost_time is:{parse_time}",
  309. file=sys.stderr,
  310. )
  311. jso["parse_time"] = parse_time
  312. except Exception as e:
  313. jso = exception_handler(jso, e)
  314. return jso
  315. def parse_pdf_for_model_train(jso: dict, start_page_id=0, debug_mode=False) -> dict:
  316. # 检测debug开关
  317. if debug_mode:
  318. pass
  319. else: # 如果debug没开,则检测是否有needdrop字段
  320. if jso.get("need_drop", False):
  321. return jso
  322. # 开始正式逻辑
  323. s3_pdf_path = jso.get("file_location")
  324. s3_config = get_s3_config(s3_pdf_path)
  325. model_output_json_list = jso.get("doc_layout_result")
  326. data_source = get_data_source(jso)
  327. file_id = jso.get("file_id")
  328. book_name = f"{data_source}/{file_id}"
  329. # 1.23.22已修复
  330. # if debug_mode:
  331. # pass
  332. # else:
  333. # if book_name == "zlib/zlib_21929367":
  334. # jso['need_drop'] = True
  335. # jso['drop_reason'] = DropReason.SPECIAL_PDF
  336. # return jso
  337. junk_img_bojids = jso["pdf_meta"]["junk_img_bojids"]
  338. # total_page = jso['pdf_meta']['total_page']
  339. # 增加检测 max_svgs 数量的检测逻辑,如果 max_svgs 超过3000则drop
  340. svgs_per_page_list = jso["pdf_meta"]["svgs_per_page"]
  341. max_svgs = max(svgs_per_page_list)
  342. if max_svgs > 3000:
  343. jso["need_drop"] = True
  344. jso["drop_reason"] = DropReason.HIGH_COMPUTATIONAL_lOAD_BY_SVGS
  345. # elif total_page > 1000:
  346. # jso['need_drop'] = True
  347. # jso['drop_reason'] = DropReason.HIGH_COMPUTATIONAL_lOAD_BY_TOTAL_PAGES
  348. else:
  349. try:
  350. save_path = s3_image_save_path
  351. image_s3_config = get_s3_config(save_path)
  352. start_time = time.time() # 记录开始时间
  353. # 先打印一下book_name和解析开始的时间
  354. logger.info(
  355. f"book_name is:{book_name},start_time is:{formatted_time(start_time)}",
  356. file=sys.stderr,
  357. )
  358. pdf_info_dict = parse_pdf_for_train(
  359. s3_pdf_path,
  360. s3_config,
  361. model_output_json_list,
  362. save_path,
  363. book_name,
  364. pdf_model_profile=None,
  365. image_s3_config=image_s3_config,
  366. start_page_id=start_page_id,
  367. junk_img_bojids=junk_img_bojids,
  368. debug_mode=debug_mode,
  369. )
  370. if pdf_info_dict.get(
  371. "need_drop", False
  372. ): # 如果返回的字典里有need_drop,则提取drop_reason并跳过本次解析
  373. jso["need_drop"] = True
  374. jso["drop_reason"] = pdf_info_dict["drop_reason"]
  375. else: # 正常返回,将 pdf_info_dict 压缩并存储
  376. jso["parsed_results"] = convert_to_train_format(pdf_info_dict)
  377. pdf_info_dict = JsonCompressor.compress_json(pdf_info_dict)
  378. jso["pdf_intermediate_dict"] = pdf_info_dict
  379. end_time = time.time() # 记录完成时间
  380. parse_time = int(end_time - start_time) # 计算执行时间
  381. # 解析完成后打印一下book_name和耗时
  382. logger.info(
  383. f"book_name is:{book_name},end_time is:{formatted_time(end_time)},cost_time is:{parse_time}",
  384. file=sys.stderr,
  385. )
  386. jso["parse_time"] = parse_time
  387. except Exception as e:
  388. jso = exception_handler(jso, e)
  389. return jso
  390. """
  391. 统一处理逻辑
  392. 1.先调用parse_pdf对文本类pdf进行处理
  393. 2.再调用ocr_dropped_parse_pdf,对之前drop的pdf进行处理
  394. """
  395. # def uni_parse_pdf(jso: dict, start_page_id=0, debug_mode=False) -> dict:
  396. # jso = parse_pdf(jso, start_page_id=start_page_id, debug_mode=debug_mode)
  397. # jso = ocr_dropped_parse_pdf(jso, start_page_id=start_page_id, debug_mode=debug_mode)
  398. # return jso
  399. if __name__ == "__main__":
  400. pass