pipeline.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478
  1. # coding=utf8
  2. import sys
  3. import time
  4. from urllib.parse import quote
  5. from magic_pdf.libs.commons import (
  6. read_file,
  7. join_path,
  8. parse_bucket_key,
  9. formatted_time,
  10. s3_image_save_path,
  11. )
  12. from magic_pdf.libs.drop_reason import DropReason
  13. from magic_pdf.libs.json_compressor import JsonCompressor
  14. from magic_pdf.dict2md.mkcontent import mk_nlp_markdown, mk_universal_format
  15. from magic_pdf.ocr_pipeline import ocr_dropped_parse_pdf
  16. from magic_pdf.pdf_parse_by_model import parse_pdf_by_model
  17. from magic_pdf.filter.pdf_classify_by_type import classify
  18. from magic_pdf.filter.pdf_meta_scan import pdf_meta_scan
  19. from loguru import logger
  20. from magic_pdf.pdf_parse_for_train import parse_pdf_for_train
  21. from magic_pdf.train_utils.convert_to_train_format import convert_to_train_format
  22. from app.common.s3 import get_s3_config, get_s3_client
  23. def exception_handler(jso: dict, e):
  24. logger.exception(e)
  25. jso["need_drop"] = True
  26. jso["drop_reason"] = DropReason.Exception
  27. jso["exception"] = f"ERROR: {e}"
  28. return jso
  29. def get_data_type(jso: dict):
  30. data_type = jso.get("data_type")
  31. if data_type is None:
  32. data_type = jso.get("file_type")
  33. return data_type
  34. def get_bookid(jso: dict):
  35. book_id = jso.get("bookid")
  36. if book_id is None:
  37. book_id = jso.get("original_file_id")
  38. return book_id
  39. def get_data_source(jso: dict):
  40. data_source = jso.get("data_source")
  41. if data_source is None:
  42. data_source = jso.get("file_source")
  43. return data_source
  44. def meta_scan(jso: dict, doc_layout_check=True) -> dict:
  45. s3_pdf_path = jso.get("file_location")
  46. s3_config = get_s3_config(s3_pdf_path)
  47. if doc_layout_check:
  48. if (
  49. "doc_layout_result" not in jso
  50. ): # 检测json中是存在模型数据,如果没有则需要跳过该pdf
  51. jso["need_drop"] = True
  52. jso["drop_reason"] = DropReason.MISS_DOC_LAYOUT_RESULT
  53. return jso
  54. try:
  55. data_source = get_data_source(jso)
  56. file_id = jso.get("file_id")
  57. book_name = f"{data_source}/{file_id}"
  58. # 首页存在超量drawing问题
  59. # special_pdf_list = ['zlib/zlib_21822650']
  60. # if book_name in special_pdf_list:
  61. # jso['need_drop'] = True
  62. # jso['drop_reason'] = DropReason.SPECIAL_PDF
  63. # return jso
  64. start_time = time.time() # 记录开始时间
  65. logger.info(
  66. f"book_name is:{book_name},start_time is:{formatted_time(start_time)}",
  67. file=sys.stderr,
  68. )
  69. file_content = read_file(s3_pdf_path, s3_config)
  70. read_file_time = int(time.time() - start_time) # 计算执行时间
  71. start_time = time.time() # 记录开始时间
  72. res = pdf_meta_scan(s3_pdf_path, file_content)
  73. if res.get(
  74. "need_drop", False
  75. ): # 如果返回的字典里有need_drop,则提取drop_reason并跳过本次解析
  76. jso["need_drop"] = True
  77. jso["drop_reason"] = res["drop_reason"]
  78. else: # 正常返回
  79. jso["pdf_meta"] = res
  80. jso["content"] = ""
  81. jso["remark"] = ""
  82. jso["data_url"] = ""
  83. end_time = time.time() # 记录结束时间
  84. meta_scan_time = int(end_time - start_time) # 计算执行时间
  85. logger.info(
  86. f"book_name is:{book_name},end_time is:{formatted_time(end_time)},read_file_time is:{read_file_time},meta_scan_time is:{meta_scan_time}",
  87. file=sys.stderr,
  88. )
  89. jso["read_file_time"] = read_file_time
  90. jso["meta_scan_time"] = meta_scan_time
  91. except Exception as e:
  92. jso = exception_handler(jso, e)
  93. return jso
  94. def classify_by_type(jso: dict, debug_mode=False) -> dict:
  95. # 检测debug开关
  96. if debug_mode:
  97. pass
  98. else: # 如果debug没开,则检测是否有needdrop字段
  99. if jso.get("need_drop", False):
  100. return jso
  101. # 开始正式逻辑
  102. try:
  103. pdf_meta = jso.get("pdf_meta")
  104. data_source = get_data_source(jso)
  105. file_id = jso.get("file_id")
  106. book_name = f"{data_source}/{file_id}"
  107. total_page = pdf_meta["total_page"]
  108. page_width = pdf_meta["page_width_pts"]
  109. page_height = pdf_meta["page_height_pts"]
  110. img_sz_list = pdf_meta["image_info_per_page"]
  111. img_num_list = pdf_meta["imgs_per_page"]
  112. text_len_list = pdf_meta["text_len_per_page"]
  113. text_layout_list = pdf_meta["text_layout_per_page"]
  114. text_language = pdf_meta["text_language"]
  115. # allow_language = ['zh', 'en'] # 允许的语言,目前只允许简中和英文的
  116. # if text_language not in allow_language: # 如果语言不在允许的语言中,则drop
  117. # jso['need_drop'] = True
  118. # jso['drop_reason'] = DropReason.NOT_ALLOW_LANGUAGE
  119. # return jso
  120. pdf_path = pdf_meta["pdf_path"]
  121. is_encrypted = pdf_meta["is_encrypted"]
  122. is_needs_password = pdf_meta["is_needs_password"]
  123. if (
  124. is_encrypted or is_needs_password
  125. ): # 加密的,需要密码的,没有页面的,都不处理
  126. jso["need_drop"] = True
  127. jso["drop_reason"] = DropReason.ENCRYPTED
  128. else:
  129. start_time = time.time() # 记录开始时间
  130. is_text_pdf, results = classify(
  131. pdf_path,
  132. total_page,
  133. page_width,
  134. page_height,
  135. img_sz_list,
  136. text_len_list,
  137. img_num_list,
  138. text_layout_list,
  139. )
  140. classify_time = int(time.time() - start_time) # 计算执行时间
  141. if is_text_pdf:
  142. pdf_meta["is_text_pdf"] = is_text_pdf
  143. jso["pdf_meta"] = pdf_meta
  144. jso["classify_time"] = classify_time
  145. # print(json.dumps(pdf_meta, ensure_ascii=False))
  146. allow_language = ["zh", "en"] # 允许的语言,目前只允许简中和英文的
  147. if (
  148. text_language not in allow_language
  149. ): # 如果语言不在允许的语言中,则drop
  150. jso["need_drop"] = True
  151. jso["drop_reason"] = DropReason.NOT_ALLOW_LANGUAGE
  152. return jso
  153. else:
  154. # 先不drop
  155. pdf_meta["is_text_pdf"] = is_text_pdf
  156. jso["pdf_meta"] = pdf_meta
  157. jso["classify_time"] = classify_time
  158. jso["need_drop"] = True
  159. jso["drop_reason"] = DropReason.NOT_IS_TEXT_PDF
  160. extra_info = {"classify_rules": []}
  161. for condition, result in results.items():
  162. if not result:
  163. extra_info["classify_rules"].append(condition)
  164. jso["extra_info"] = extra_info
  165. except Exception as e:
  166. jso = exception_handler(jso, e)
  167. return jso
  168. def save_tables_to_s3(jso: dict, debug_mode=False) -> dict:
  169. if debug_mode:
  170. pass
  171. else: # 如果debug没开,则检测是否有needdrop字段
  172. if jso.get("need_drop", False):
  173. logger.info(
  174. f"book_name is:{get_data_source(jso)}/{jso['file_id']} need drop",
  175. file=sys.stderr,
  176. )
  177. jso["dropped"] = True
  178. return jso
  179. try:
  180. data_source = get_data_source(jso)
  181. file_id = jso.get("file_id")
  182. book_name = f"{data_source}/{file_id}"
  183. title = jso.get("title")
  184. url_encode_title = quote(title, safe="")
  185. if data_source != "scihub":
  186. return jso
  187. pdf_intermediate_dict = jso["pdf_intermediate_dict"]
  188. # 将 pdf_intermediate_dict 解压
  189. pdf_intermediate_dict = JsonCompressor.decompress_json(pdf_intermediate_dict)
  190. i = 0
  191. for page in pdf_intermediate_dict.values():
  192. if page.get("tables"):
  193. if len(page["tables"]) > 0:
  194. j = 0
  195. for table in page["tables"]:
  196. if debug_mode:
  197. image_path = join_path(
  198. "s3://mllm-raw-media/pdf2md_img/",
  199. book_name,
  200. table["image_path"],
  201. )
  202. else:
  203. image_path = join_path(
  204. "s3://mllm-raw-media/pdf2md_img/", table["image_path"]
  205. )
  206. if image_path.endswith(".jpg"):
  207. j += 1
  208. s3_client = get_s3_client(image_path)
  209. bucket_name, bucket_key = parse_bucket_key(image_path)
  210. # 通过s3_client获取图片到内存
  211. image_bytes = s3_client.get_object(
  212. Bucket=bucket_name, Key=bucket_key
  213. )["Body"].read()
  214. # 保存图片到新的位置
  215. if debug_mode:
  216. new_image_path = join_path(
  217. "s3://mllm-raw-media/pdf2md_img/table_new/",
  218. url_encode_title
  219. + "_"
  220. + table["image_path"].lstrip("tables/"),
  221. )
  222. else:
  223. new_image_path = join_path(
  224. "s3://mllm-raw-media/pdf2md_img/table_new/",
  225. url_encode_title + f"_page{i}_{j}.jpg",
  226. )
  227. logger.info(new_image_path, file=sys.stderr)
  228. bucket_name, bucket_key = parse_bucket_key(new_image_path)
  229. s3_client.put_object(
  230. Bucket=bucket_name, Key=bucket_key, Body=image_bytes
  231. )
  232. else:
  233. continue
  234. i += 1
  235. # 把无用的信息清空
  236. jso["doc_layout_result"] = ""
  237. jso["pdf_intermediate_dict"] = ""
  238. jso["pdf_meta"] = ""
  239. except Exception as e:
  240. jso = exception_handler(jso, e)
  241. return jso
  242. def drop_needdrop_pdf(jso: dict) -> dict:
  243. if jso.get("need_drop", False):
  244. logger.info(
  245. f"book_name is:{get_data_source(jso)}/{jso['file_id']} need drop",
  246. file=sys.stderr,
  247. )
  248. jso["dropped"] = True
  249. return jso
  250. def pdf_intermediate_dict_to_markdown(jso: dict, debug_mode=False) -> dict:
  251. if debug_mode:
  252. pass
  253. else: # 如果debug没开,则检测是否有needdrop字段
  254. if jso.get("need_drop", False):
  255. book_name = join_path(get_data_source(jso), jso["file_id"])
  256. logger.info(f"book_name is:{book_name} need drop", file=sys.stderr)
  257. jso["dropped"] = True
  258. return jso
  259. try:
  260. pdf_intermediate_dict = jso["pdf_intermediate_dict"]
  261. # 将 pdf_intermediate_dict 解压
  262. pdf_intermediate_dict = JsonCompressor.decompress_json(pdf_intermediate_dict)
  263. # markdown_content = mk_nlp_markdown(pdf_intermediate_dict)
  264. jso["content_list"] = mk_universal_format(pdf_intermediate_dict)
  265. # jso["content"] = markdown_content
  266. logger.info(f"book_name is:{get_data_source(jso)}/{jso['file_id']}")
  267. # 把无用的信息清空
  268. jso["doc_layout_result"] = ""
  269. jso["pdf_intermediate_dict"] = ""
  270. jso["pdf_meta"] = ""
  271. except Exception as e:
  272. jso = exception_handler(jso, e)
  273. return jso
  274. def parse_pdf(jso: dict, start_page_id=0, debug_mode=False) -> dict:
  275. # 检测debug开关
  276. if debug_mode:
  277. pass
  278. else: # 如果debug没开,则检测是否有needdrop字段
  279. if jso.get("need_drop", False):
  280. return jso
  281. # 开始正式逻辑
  282. s3_pdf_path = jso.get("file_location")
  283. s3_config = get_s3_config(s3_pdf_path)
  284. model_output_json_list = jso.get("doc_layout_result")
  285. data_source = get_data_source(jso)
  286. file_id = jso.get("file_id")
  287. book_name = f"{data_source}/{file_id}"
  288. # 1.23.22已修复
  289. # if debug_mode:
  290. # pass
  291. # else:
  292. # if book_name == "zlib/zlib_21929367":
  293. # jso['need_drop'] = True
  294. # jso['drop_reason'] = DropReason.SPECIAL_PDF
  295. # return jso
  296. junk_img_bojids = jso["pdf_meta"]["junk_img_bojids"]
  297. # total_page = jso['pdf_meta']['total_page']
  298. # 增加检测 max_svgs 数量的检测逻辑,如果 max_svgs 超过3000则drop
  299. svgs_per_page_list = jso["pdf_meta"]["svgs_per_page"]
  300. max_svgs = max(svgs_per_page_list)
  301. if max_svgs > 3000:
  302. jso["need_drop"] = True
  303. jso["drop_reason"] = DropReason.HIGH_COMPUTATIONAL_lOAD_BY_SVGS
  304. # elif total_page > 1000:
  305. # jso['need_drop'] = True
  306. # jso['drop_reason'] = DropReason.HIGH_COMPUTATIONAL_lOAD_BY_TOTAL_PAGES
  307. else:
  308. try:
  309. save_path = s3_image_save_path
  310. image_s3_config = get_s3_config(save_path)
  311. start_time = time.time() # 记录开始时间
  312. # 先打印一下book_name和解析开始的时间
  313. logger.info(
  314. f"book_name is:{book_name},start_time is:{formatted_time(start_time)}",
  315. file=sys.stderr,
  316. )
  317. pdf_info_dict = parse_pdf_by_model(
  318. s3_pdf_path,
  319. s3_config,
  320. model_output_json_list,
  321. save_path,
  322. book_name,
  323. pdf_model_profile=None,
  324. image_s3_config=image_s3_config,
  325. start_page_id=start_page_id,
  326. junk_img_bojids=junk_img_bojids,
  327. debug_mode=debug_mode,
  328. )
  329. if pdf_info_dict.get(
  330. "need_drop", False
  331. ): # 如果返回的字典里有need_drop,则提取drop_reason并跳过本次解析
  332. jso["need_drop"] = True
  333. jso["drop_reason"] = pdf_info_dict["drop_reason"]
  334. else: # 正常返回,将 pdf_info_dict 压缩并存储
  335. pdf_info_dict = JsonCompressor.compress_json(pdf_info_dict)
  336. jso["pdf_intermediate_dict"] = pdf_info_dict
  337. end_time = time.time() # 记录完成时间
  338. parse_time = int(end_time - start_time) # 计算执行时间
  339. # 解析完成后打印一下book_name和耗时
  340. logger.info(
  341. f"book_name is:{book_name},end_time is:{formatted_time(end_time)},cost_time is:{parse_time}",
  342. file=sys.stderr,
  343. )
  344. jso["parse_time"] = parse_time
  345. except Exception as e:
  346. jso = exception_handler(jso, e)
  347. return jso
  348. """
  349. 统一处理逻辑
  350. 1.先调用parse_pdf对文本类pdf进行处理
  351. 2.再调用ocr_dropped_parse_pdf,对之前drop的pdf进行处理
  352. """
  353. def uni_parse_pdf(jso: dict, start_page_id=0, debug_mode=False) -> dict:
  354. jso = parse_pdf(jso, start_page_id=start_page_id, debug_mode=debug_mode)
  355. jso = ocr_dropped_parse_pdf(jso, start_page_id=start_page_id, debug_mode=debug_mode)
  356. return jso
  357. def parse_pdf_for_model_train(jso: dict, start_page_id=0, debug_mode=False) -> dict:
  358. # 检测debug开关
  359. if debug_mode:
  360. pass
  361. else: # 如果debug没开,则检测是否有needdrop字段
  362. if jso.get("need_drop", False):
  363. return jso
  364. # 开始正式逻辑
  365. s3_pdf_path = jso.get("file_location")
  366. s3_config = get_s3_config(s3_pdf_path)
  367. model_output_json_list = jso.get("doc_layout_result")
  368. data_source = get_data_source(jso)
  369. file_id = jso.get("file_id")
  370. book_name = f"{data_source}/{file_id}"
  371. # 1.23.22已修复
  372. # if debug_mode:
  373. # pass
  374. # else:
  375. # if book_name == "zlib/zlib_21929367":
  376. # jso['need_drop'] = True
  377. # jso['drop_reason'] = DropReason.SPECIAL_PDF
  378. # return jso
  379. junk_img_bojids = jso["pdf_meta"]["junk_img_bojids"]
  380. # total_page = jso['pdf_meta']['total_page']
  381. # 增加检测 max_svgs 数量的检测逻辑,如果 max_svgs 超过3000则drop
  382. svgs_per_page_list = jso["pdf_meta"]["svgs_per_page"]
  383. max_svgs = max(svgs_per_page_list)
  384. if max_svgs > 3000:
  385. jso["need_drop"] = True
  386. jso["drop_reason"] = DropReason.HIGH_COMPUTATIONAL_lOAD_BY_SVGS
  387. # elif total_page > 1000:
  388. # jso['need_drop'] = True
  389. # jso['drop_reason'] = DropReason.HIGH_COMPUTATIONAL_lOAD_BY_TOTAL_PAGES
  390. else:
  391. try:
  392. save_path = s3_image_save_path
  393. image_s3_config = get_s3_config(save_path)
  394. start_time = time.time() # 记录开始时间
  395. # 先打印一下book_name和解析开始的时间
  396. logger.info(
  397. f"book_name is:{book_name},start_time is:{formatted_time(start_time)}",
  398. file=sys.stderr,
  399. )
  400. pdf_info_dict = parse_pdf_for_train(
  401. s3_pdf_path,
  402. s3_config,
  403. model_output_json_list,
  404. save_path,
  405. book_name,
  406. pdf_model_profile=None,
  407. image_s3_config=image_s3_config,
  408. start_page_id=start_page_id,
  409. junk_img_bojids=junk_img_bojids,
  410. debug_mode=debug_mode,
  411. )
  412. if pdf_info_dict.get(
  413. "need_drop", False
  414. ): # 如果返回的字典里有need_drop,则提取drop_reason并跳过本次解析
  415. jso["need_drop"] = True
  416. jso["drop_reason"] = pdf_info_dict["drop_reason"]
  417. else: # 正常返回,将 pdf_info_dict 压缩并存储
  418. jso["parsed_results"] = convert_to_train_format(pdf_info_dict)
  419. pdf_info_dict = JsonCompressor.compress_json(pdf_info_dict)
  420. jso["pdf_intermediate_dict"] = pdf_info_dict
  421. end_time = time.time() # 记录完成时间
  422. parse_time = int(end_time - start_time) # 计算执行时间
  423. # 解析完成后打印一下book_name和耗时
  424. logger.info(
  425. f"book_name is:{book_name},end_time is:{formatted_time(end_time)},cost_time is:{parse_time}",
  426. file=sys.stderr,
  427. )
  428. jso["parse_time"] = parse_time
  429. except Exception as e:
  430. jso = exception_handler(jso, e)
  431. return jso
  432. if __name__ == "__main__":
  433. pass