pipeline.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440
  1. # coding=utf8
  2. import sys
  3. import time
  4. from urllib.parse import quote
  5. from magic_pdf.libs.commons import (
  6. read_file,
  7. join_path,
  8. parse_bucket_key,
  9. formatted_time,
  10. s3_image_save_path,
  11. )
  12. from magic_pdf.libs.drop_reason import DropReason
  13. from magic_pdf.libs.json_compressor import JsonCompressor
  14. from magic_pdf.dict2md.mkcontent import mk_universal_format
  15. from magic_pdf.pdf_parse_by_txt import parse_pdf_by_txt
  16. from magic_pdf.filter.pdf_classify_by_type import classify
  17. from magic_pdf.filter.pdf_meta_scan import pdf_meta_scan
  18. from loguru import logger
  19. from magic_pdf.pdf_parse_for_train import parse_pdf_for_train
  20. from magic_pdf.spark.base import exception_handler, get_data_source
  21. from magic_pdf.train_utils.convert_to_train_format import convert_to_train_format
  22. from magic_pdf.spark.s3 import get_s3_config, get_s3_client
  23. def meta_scan(jso: dict, doc_layout_check=True) -> dict:
  24. s3_pdf_path = jso.get("file_location")
  25. s3_config = get_s3_config(s3_pdf_path)
  26. if doc_layout_check:
  27. if (
  28. "doc_layout_result" not in jso
  29. ): # 检测json中是存在模型数据,如果没有则需要跳过该pdf
  30. jso["need_drop"] = True
  31. jso["drop_reason"] = DropReason.MISS_DOC_LAYOUT_RESULT
  32. return jso
  33. try:
  34. data_source = get_data_source(jso)
  35. file_id = jso.get("file_id")
  36. book_name = f"{data_source}/{file_id}"
  37. # 首页存在超量drawing问题
  38. # special_pdf_list = ['zlib/zlib_21822650']
  39. # if book_name in special_pdf_list:
  40. # jso['need_drop'] = True
  41. # jso['drop_reason'] = DropReason.SPECIAL_PDF
  42. # return jso
  43. start_time = time.time() # 记录开始时间
  44. logger.info(
  45. f"book_name is:{book_name},start_time is:{formatted_time(start_time)}",
  46. file=sys.stderr,
  47. )
  48. file_content = read_file(s3_pdf_path, s3_config)
  49. read_file_time = int(time.time() - start_time) # 计算执行时间
  50. start_time = time.time() # 记录开始时间
  51. res = pdf_meta_scan(s3_pdf_path, file_content)
  52. if res.get(
  53. "need_drop", False
  54. ): # 如果返回的字典里有need_drop,则提取drop_reason并跳过本次解析
  55. jso["need_drop"] = True
  56. jso["drop_reason"] = res["drop_reason"]
  57. else: # 正常返回
  58. jso["pdf_meta"] = res
  59. jso["content"] = ""
  60. jso["remark"] = ""
  61. jso["data_url"] = ""
  62. end_time = time.time() # 记录结束时间
  63. meta_scan_time = int(end_time - start_time) # 计算执行时间
  64. logger.info(
  65. f"book_name is:{book_name},end_time is:{formatted_time(end_time)},read_file_time is:{read_file_time},meta_scan_time is:{meta_scan_time}",
  66. file=sys.stderr,
  67. )
  68. jso["read_file_time"] = read_file_time
  69. jso["meta_scan_time"] = meta_scan_time
  70. except Exception as e:
  71. jso = exception_handler(jso, e)
  72. return jso
  73. def classify_by_type(jso: dict, debug_mode=False) -> dict:
  74. # 检测debug开关
  75. if debug_mode:
  76. pass
  77. else: # 如果debug没开,则检测是否有needdrop字段
  78. if jso.get("need_drop", False):
  79. return jso
  80. # 开始正式逻辑
  81. try:
  82. pdf_meta = jso.get("pdf_meta")
  83. data_source = get_data_source(jso)
  84. file_id = jso.get("file_id")
  85. book_name = f"{data_source}/{file_id}"
  86. total_page = pdf_meta["total_page"]
  87. page_width = pdf_meta["page_width_pts"]
  88. page_height = pdf_meta["page_height_pts"]
  89. img_sz_list = pdf_meta["image_info_per_page"]
  90. img_num_list = pdf_meta["imgs_per_page"]
  91. text_len_list = pdf_meta["text_len_per_page"]
  92. text_layout_list = pdf_meta["text_layout_per_page"]
  93. text_language = pdf_meta["text_language"]
  94. # allow_language = ['zh', 'en'] # 允许的语言,目前只允许简中和英文的
  95. # if text_language not in allow_language: # 如果语言不在允许的语言中,则drop
  96. # jso['need_drop'] = True
  97. # jso['drop_reason'] = DropReason.NOT_ALLOW_LANGUAGE
  98. # return jso
  99. pdf_path = pdf_meta["pdf_path"]
  100. is_encrypted = pdf_meta["is_encrypted"]
  101. is_needs_password = pdf_meta["is_needs_password"]
  102. if (
  103. is_encrypted or is_needs_password
  104. ): # 加密的,需要密码的,没有页面的,都不处理
  105. jso["need_drop"] = True
  106. jso["drop_reason"] = DropReason.ENCRYPTED
  107. else:
  108. start_time = time.time() # 记录开始时间
  109. is_text_pdf, results = classify(
  110. pdf_path,
  111. total_page,
  112. page_width,
  113. page_height,
  114. img_sz_list,
  115. text_len_list,
  116. img_num_list,
  117. text_layout_list,
  118. )
  119. classify_time = int(time.time() - start_time) # 计算执行时间
  120. if is_text_pdf:
  121. pdf_meta["is_text_pdf"] = is_text_pdf
  122. jso["_pdf_type"] = "TXT"
  123. jso["pdf_meta"] = pdf_meta
  124. jso["classify_time"] = classify_time
  125. # print(json.dumps(pdf_meta, ensure_ascii=False))
  126. allow_language = ["zh", "en"] # 允许的语言,目前只允许简中和英文的
  127. if (
  128. text_language not in allow_language
  129. ): # 如果语言不在允许的语言中,则drop
  130. jso["need_drop"] = True
  131. jso["drop_reason"] = DropReason.NOT_ALLOW_LANGUAGE
  132. return jso
  133. else:
  134. # 先不drop
  135. pdf_meta["is_text_pdf"] = is_text_pdf
  136. jso["_pdf_type"] = "OCR"
  137. jso["pdf_meta"] = pdf_meta
  138. jso["classify_time"] = classify_time
  139. # jso["need_drop"] = True
  140. # jso["drop_reason"] = DropReason.NOT_IS_TEXT_PDF
  141. extra_info = {"classify_rules": []}
  142. for condition, result in results.items():
  143. if not result:
  144. extra_info["classify_rules"].append(condition)
  145. jso["extra_info"] = extra_info
  146. except Exception as e:
  147. jso = exception_handler(jso, e)
  148. return jso
  149. def save_tables_to_s3(jso: dict, debug_mode=False) -> dict:
  150. if debug_mode:
  151. pass
  152. else: # 如果debug没开,则检测是否有needdrop字段
  153. if jso.get("need_drop", False):
  154. logger.info(
  155. f"book_name is:{get_data_source(jso)}/{jso['file_id']} need drop",
  156. file=sys.stderr,
  157. )
  158. jso["dropped"] = True
  159. return jso
  160. try:
  161. data_source = get_data_source(jso)
  162. file_id = jso.get("file_id")
  163. book_name = f"{data_source}/{file_id}"
  164. title = jso.get("title")
  165. url_encode_title = quote(title, safe="")
  166. if data_source != "scihub":
  167. return jso
  168. pdf_intermediate_dict = jso["pdf_intermediate_dict"]
  169. # 将 pdf_intermediate_dict 解压
  170. pdf_intermediate_dict = JsonCompressor.decompress_json(pdf_intermediate_dict)
  171. i = 0
  172. for page in pdf_intermediate_dict.values():
  173. if page.get("tables"):
  174. if len(page["tables"]) > 0:
  175. j = 0
  176. for table in page["tables"]:
  177. if debug_mode:
  178. image_path = join_path(
  179. "s3://mllm-raw-media/pdf2md_img/",
  180. book_name,
  181. table["image_path"],
  182. )
  183. else:
  184. image_path = join_path(
  185. "s3://mllm-raw-media/pdf2md_img/", table["image_path"]
  186. )
  187. if image_path.endswith(".jpg"):
  188. j += 1
  189. s3_client = get_s3_client(image_path)
  190. bucket_name, bucket_key = parse_bucket_key(image_path)
  191. # 通过s3_client获取图片到内存
  192. image_bytes = s3_client.get_object(
  193. Bucket=bucket_name, Key=bucket_key
  194. )["Body"].read()
  195. # 保存图片到新的位置
  196. if debug_mode:
  197. new_image_path = join_path(
  198. "s3://mllm-raw-media/pdf2md_img/table_new/",
  199. url_encode_title
  200. + "_"
  201. + table["image_path"].lstrip("tables/"),
  202. )
  203. else:
  204. new_image_path = join_path(
  205. "s3://mllm-raw-media/pdf2md_img/table_new/",
  206. url_encode_title + f"_page{i}_{j}.jpg",
  207. )
  208. logger.info(new_image_path, file=sys.stderr)
  209. bucket_name, bucket_key = parse_bucket_key(new_image_path)
  210. s3_client.put_object(
  211. Bucket=bucket_name, Key=bucket_key, Body=image_bytes
  212. )
  213. else:
  214. continue
  215. i += 1
  216. # 把无用的信息清空
  217. jso["doc_layout_result"] = ""
  218. jso["pdf_intermediate_dict"] = ""
  219. jso["pdf_meta"] = ""
  220. except Exception as e:
  221. jso = exception_handler(jso, e)
  222. return jso
  223. def drop_needdrop_pdf(jso: dict) -> dict:
  224. if jso.get("need_drop", False):
  225. logger.info(
  226. f"book_name is:{get_data_source(jso)}/{jso['file_id']} need drop",
  227. file=sys.stderr,
  228. )
  229. jso["dropped"] = True
  230. return jso
  231. def pdf_intermediate_dict_to_markdown(jso: dict, debug_mode=False) -> dict:
  232. if debug_mode:
  233. pass
  234. else: # 如果debug没开,则检测是否有needdrop字段
  235. if jso.get("need_drop", False):
  236. book_name = join_path(get_data_source(jso), jso["file_id"])
  237. logger.info(f"book_name is:{book_name} need drop", file=sys.stderr)
  238. jso["dropped"] = True
  239. return jso
  240. try:
  241. pdf_intermediate_dict = jso["pdf_intermediate_dict"]
  242. # 将 pdf_intermediate_dict 解压
  243. pdf_intermediate_dict = JsonCompressor.decompress_json(pdf_intermediate_dict)
  244. # markdown_content = mk_nlp_markdown(pdf_intermediate_dict)
  245. jso["content_list"] = mk_universal_format(pdf_intermediate_dict)
  246. # jso["content"] = markdown_content
  247. logger.info(f"book_name is:{get_data_source(jso)}/{jso['file_id']}")
  248. # 把无用的信息清空
  249. jso["doc_layout_result"] = ""
  250. jso["pdf_intermediate_dict"] = ""
  251. jso["pdf_meta"] = ""
  252. except Exception as e:
  253. jso = exception_handler(jso, e)
  254. return jso
  255. def parse_pdf(jso: dict, start_page_id=0, debug_mode=False) -> dict:
  256. # 检测debug开关
  257. if debug_mode:
  258. pass
  259. else: # 如果debug没开,则检测是否有needdrop字段
  260. if jso.get("need_drop", False):
  261. return jso
  262. # 开始正式逻辑
  263. s3_pdf_path = jso.get("file_location")
  264. s3_config = get_s3_config(s3_pdf_path)
  265. pdf_bytes = read_file(s3_pdf_path, s3_config)
  266. model_output_json_list = jso.get("doc_layout_result")
  267. data_source = get_data_source(jso)
  268. file_id = jso.get("file_id")
  269. book_name = f"{data_source}/{file_id}"
  270. junk_img_bojids = jso["pdf_meta"]["junk_img_bojids"]
  271. # 增加检测 max_svgs 数量的检测逻辑,如果 max_svgs 超过3000则drop
  272. svgs_per_page_list = jso["pdf_meta"]["svgs_per_page"]
  273. max_svgs = max(svgs_per_page_list)
  274. if max_svgs > 3000:
  275. jso["need_drop"] = True
  276. jso["drop_reason"] = DropReason.HIGH_COMPUTATIONAL_lOAD_BY_SVGS
  277. else:
  278. try:
  279. save_path = s3_image_save_path
  280. image_s3_config = get_s3_config(save_path)
  281. start_time = time.time() # 记录开始时间
  282. # 先打印一下book_name和解析开始的时间
  283. logger.info(
  284. f"book_name is:{book_name},start_time is:{formatted_time(start_time)}",
  285. file=sys.stderr,
  286. )
  287. pdf_info_dict = parse_pdf_by_txt(
  288. pdf_bytes,
  289. model_output_json_list,
  290. save_path,
  291. book_name,
  292. pdf_model_profile=None,
  293. image_s3_config=image_s3_config,
  294. start_page_id=start_page_id,
  295. junk_img_bojids=junk_img_bojids,
  296. debug_mode=debug_mode,
  297. )
  298. if pdf_info_dict.get(
  299. "need_drop", False
  300. ): # 如果返回的字典里有need_drop,则提取drop_reason并跳过本次解析
  301. jso["need_drop"] = True
  302. jso["drop_reason"] = pdf_info_dict["drop_reason"]
  303. else: # 正常返回,将 pdf_info_dict 压缩并存储
  304. pdf_info_dict = JsonCompressor.compress_json(pdf_info_dict)
  305. jso["pdf_intermediate_dict"] = pdf_info_dict
  306. end_time = time.time() # 记录完成时间
  307. parse_time = int(end_time - start_time) # 计算执行时间
  308. # 解析完成后打印一下book_name和耗时
  309. logger.info(
  310. f"book_name is:{book_name},end_time is:{formatted_time(end_time)},cost_time is:{parse_time}",
  311. file=sys.stderr,
  312. )
  313. jso["parse_time"] = parse_time
  314. except Exception as e:
  315. jso = exception_handler(jso, e)
  316. return jso
  317. def parse_pdf_for_model_train(jso: dict, start_page_id=0, debug_mode=False) -> dict:
  318. # 检测debug开关
  319. if debug_mode:
  320. pass
  321. else: # 如果debug没开,则检测是否有needdrop字段
  322. if jso.get("need_drop", False):
  323. return jso
  324. # 开始正式逻辑
  325. s3_pdf_path = jso.get("file_location")
  326. s3_config = get_s3_config(s3_pdf_path)
  327. model_output_json_list = jso.get("doc_layout_result")
  328. data_source = get_data_source(jso)
  329. file_id = jso.get("file_id")
  330. book_name = f"{data_source}/{file_id}"
  331. # 1.23.22已修复
  332. # if debug_mode:
  333. # pass
  334. # else:
  335. # if book_name == "zlib/zlib_21929367":
  336. # jso['need_drop'] = True
  337. # jso['drop_reason'] = DropReason.SPECIAL_PDF
  338. # return jso
  339. junk_img_bojids = jso["pdf_meta"]["junk_img_bojids"]
  340. # total_page = jso['pdf_meta']['total_page']
  341. # 增加检测 max_svgs 数量的检测逻辑,如果 max_svgs 超过3000则drop
  342. svgs_per_page_list = jso["pdf_meta"]["svgs_per_page"]
  343. max_svgs = max(svgs_per_page_list)
  344. if max_svgs > 3000:
  345. jso["need_drop"] = True
  346. jso["drop_reason"] = DropReason.HIGH_COMPUTATIONAL_lOAD_BY_SVGS
  347. # elif total_page > 1000:
  348. # jso['need_drop'] = True
  349. # jso['drop_reason'] = DropReason.HIGH_COMPUTATIONAL_lOAD_BY_TOTAL_PAGES
  350. else:
  351. try:
  352. save_path = s3_image_save_path
  353. image_s3_config = get_s3_config(save_path)
  354. start_time = time.time() # 记录开始时间
  355. # 先打印一下book_name和解析开始的时间
  356. logger.info(
  357. f"book_name is:{book_name},start_time is:{formatted_time(start_time)}",
  358. file=sys.stderr,
  359. )
  360. pdf_info_dict = parse_pdf_for_train(
  361. s3_pdf_path,
  362. s3_config,
  363. model_output_json_list,
  364. save_path,
  365. book_name,
  366. pdf_model_profile=None,
  367. image_s3_config=image_s3_config,
  368. start_page_id=start_page_id,
  369. junk_img_bojids=junk_img_bojids,
  370. debug_mode=debug_mode,
  371. )
  372. if pdf_info_dict.get(
  373. "need_drop", False
  374. ): # 如果返回的字典里有need_drop,则提取drop_reason并跳过本次解析
  375. jso["need_drop"] = True
  376. jso["drop_reason"] = pdf_info_dict["drop_reason"]
  377. else: # 正常返回,将 pdf_info_dict 压缩并存储
  378. jso["parsed_results"] = convert_to_train_format(pdf_info_dict)
  379. pdf_info_dict = JsonCompressor.compress_json(pdf_info_dict)
  380. jso["pdf_intermediate_dict"] = pdf_info_dict
  381. end_time = time.time() # 记录完成时间
  382. parse_time = int(end_time - start_time) # 计算执行时间
  383. # 解析完成后打印一下book_name和耗时
  384. logger.info(
  385. f"book_name is:{book_name},end_time is:{formatted_time(end_time)},cost_time is:{parse_time}",
  386. file=sys.stderr,
  387. )
  388. jso["parse_time"] = parse_time
  389. except Exception as e:
  390. jso = exception_handler(jso, e)
  391. return jso
  392. """
  393. 统一处理逻辑
  394. 1.先调用parse_pdf对文本类pdf进行处理
  395. 2.再调用ocr_dropped_parse_pdf,对之前drop的pdf进行处理
  396. """
  397. # def uni_parse_pdf(jso: dict, start_page_id=0, debug_mode=False) -> dict:
  398. # jso = parse_pdf(jso, start_page_id=start_page_id, debug_mode=debug_mode)
  399. # jso = ocr_dropped_parse_pdf(jso, start_page_id=start_page_id, debug_mode=debug_mode)
  400. # return jso
  401. if __name__ == "__main__":
  402. pass