pipeline.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418
  1. # coding=utf8
  2. import sys
  3. import time
  4. from urllib.parse import quote
  5. from magic_pdf.dict2md.ocr_mkcontent import ocr_mk_nlp_markdown, ocr_mk_mm_markdown, ocr_mk_mm_standard_format
  6. from magic_pdf.libs.commons import read_file, join_path, parse_bucket_key, formatted_time, s3_image_save_path
  7. from magic_pdf.libs.drop_reason import DropReason
  8. from magic_pdf.libs.json_compressor import JsonCompressor
  9. from magic_pdf.dict2md.mkcontent import mk_nlp_markdown, mk_universal_format
  10. from magic_pdf.pdf_parse_by_model import parse_pdf_by_model
  11. from magic_pdf.filter.pdf_classify_by_type import classify
  12. from magic_pdf.filter.pdf_meta_scan import pdf_meta_scan
  13. from loguru import logger
  14. from app.common.s3 import get_s3_config, get_s3_client
  15. from magic_pdf.pdf_parse_by_ocr import parse_pdf_by_ocr
  16. def exception_handler(jso: dict, e):
  17. logger.exception(e)
  18. jso['need_drop'] = True
  19. jso['drop_reason'] = DropReason.Exception
  20. jso['exception'] = f"ERROR: {e}"
  21. return jso
  22. def get_data_type(jso: dict):
  23. data_type = jso.get('data_type')
  24. if data_type is None:
  25. data_type = jso.get('file_type')
  26. return data_type
  27. def get_bookid(jso: dict):
  28. book_id = jso.get('bookid')
  29. if book_id is None:
  30. book_id = jso.get('original_file_id')
  31. return book_id
  32. def get_data_source(jso: dict):
  33. data_source = jso.get('data_source')
  34. if data_source is None:
  35. data_source = jso.get('file_source')
  36. return data_source
  37. def meta_scan(jso: dict, doc_layout_check=True) -> dict:
  38. s3_pdf_path = jso.get('file_location')
  39. s3_config = get_s3_config(s3_pdf_path)
  40. if doc_layout_check:
  41. if 'doc_layout_result' not in jso: # 检测json中是存在模型数据,如果没有则需要跳过该pdf
  42. jso['need_drop'] = True
  43. jso['drop_reason'] = DropReason.MISS_DOC_LAYOUT_RESULT
  44. return jso
  45. try:
  46. data_source = get_data_source(jso)
  47. file_id = jso.get('file_id')
  48. book_name = f"{data_source}/{file_id}"
  49. # 首页存在超量drawing问题
  50. # special_pdf_list = ['zlib/zlib_21822650']
  51. # if book_name in special_pdf_list:
  52. # jso['need_drop'] = True
  53. # jso['drop_reason'] = DropReason.SPECIAL_PDF
  54. # return jso
  55. start_time = time.time() # 记录开始时间
  56. logger.info(f"book_name is:{book_name},start_time is:{formatted_time(start_time)}", file=sys.stderr)
  57. file_content = read_file(s3_pdf_path, s3_config)
  58. read_file_time = int(time.time() - start_time) # 计算执行时间
  59. start_time = time.time() # 记录开始时间
  60. res = pdf_meta_scan(s3_pdf_path, file_content)
  61. if res.get('need_drop', False): # 如果返回的字典里有need_drop,则提取drop_reason并跳过本次解析
  62. jso['need_drop'] = True
  63. jso['drop_reason'] = res["drop_reason"]
  64. else: # 正常返回
  65. jso['pdf_meta'] = res
  66. jso['content'] = ""
  67. jso['remark'] = ""
  68. jso['data_url'] = ""
  69. end_time = time.time() # 记录结束时间
  70. meta_scan_time = int(end_time - start_time) # 计算执行时间
  71. logger.info(f"book_name is:{book_name},end_time is:{formatted_time(end_time)},read_file_time is:{read_file_time},meta_scan_time is:{meta_scan_time}", file=sys.stderr)
  72. jso['read_file_time'] = read_file_time
  73. jso['meta_scan_time'] = meta_scan_time
  74. except Exception as e:
  75. jso = exception_handler(jso, e)
  76. return jso
  77. def classify_by_type(jso: dict, debug_mode=False) -> dict:
  78. #检测debug开关
  79. if debug_mode:
  80. pass
  81. else:# 如果debug没开,则检测是否有needdrop字段
  82. if jso.get('need_drop', False):
  83. return jso
  84. # 开始正式逻辑
  85. try:
  86. pdf_meta = jso.get('pdf_meta')
  87. data_source = get_data_source(jso)
  88. file_id = jso.get('file_id')
  89. book_name = f"{data_source}/{file_id}"
  90. total_page = pdf_meta["total_page"]
  91. page_width = pdf_meta["page_width_pts"]
  92. page_height = pdf_meta["page_height_pts"]
  93. img_sz_list = pdf_meta["image_info_per_page"]
  94. img_num_list = pdf_meta['imgs_per_page']
  95. text_len_list = pdf_meta['text_len_per_page']
  96. text_layout_list = pdf_meta['text_layout_per_page']
  97. text_language = pdf_meta['text_language']
  98. # allow_language = ['zh', 'en'] # 允许的语言,目前只允许简中和英文的
  99. # if text_language not in allow_language: # 如果语言不在允许的语言中,则drop
  100. # jso['need_drop'] = True
  101. # jso['drop_reason'] = DropReason.NOT_ALLOW_LANGUAGE
  102. # return jso
  103. pdf_path = pdf_meta['pdf_path']
  104. is_encrypted = pdf_meta['is_encrypted']
  105. is_needs_password = pdf_meta['is_needs_password']
  106. if is_encrypted or is_needs_password: # 加密的,需要密码的,没有页面的,都不处理
  107. jso['need_drop'] = True
  108. jso['drop_reason'] = DropReason.ENCRYPTED
  109. else:
  110. start_time = time.time() # 记录开始时间
  111. is_text_pdf, results = classify(pdf_path, total_page, page_width, page_height, img_sz_list, text_len_list, img_num_list, text_layout_list)
  112. classify_time = int(time.time() - start_time) # 计算执行时间
  113. if is_text_pdf:
  114. pdf_meta['is_text_pdf'] = is_text_pdf
  115. jso['pdf_meta'] = pdf_meta
  116. jso['classify_time'] = classify_time
  117. # print(json.dumps(pdf_meta, ensure_ascii=False))
  118. allow_language = ['zh', 'en'] # 允许的语言,目前只允许简中和英文的
  119. if text_language not in allow_language: # 如果语言不在允许的语言中,则drop
  120. jso['need_drop'] = True
  121. jso['drop_reason'] = DropReason.NOT_ALLOW_LANGUAGE
  122. return jso
  123. else:
  124. # 先不drop
  125. pdf_meta['is_text_pdf'] = is_text_pdf
  126. jso['pdf_meta'] = pdf_meta
  127. jso['classify_time'] = classify_time
  128. jso['need_drop'] = True
  129. jso['drop_reason'] = DropReason.NOT_IS_TEXT_PDF
  130. extra_info = {"classify_rules": []}
  131. for condition, result in results.items():
  132. if not result:
  133. extra_info["classify_rules"].append(condition)
  134. jso['extra_info'] = extra_info
  135. except Exception as e:
  136. jso = exception_handler(jso, e)
  137. return jso
  138. def save_tables_to_s3(jso: dict, debug_mode=False) -> dict:
  139. if debug_mode:
  140. pass
  141. else:# 如果debug没开,则检测是否有needdrop字段
  142. if jso.get('need_drop', False):
  143. logger.info(f"book_name is:{get_data_source(jso)}/{jso['file_id']} need drop", file=sys.stderr)
  144. jso["dropped"] = True
  145. return jso
  146. try:
  147. data_source = get_data_source(jso)
  148. file_id = jso.get('file_id')
  149. book_name = f"{data_source}/{file_id}"
  150. title = jso.get('title')
  151. url_encode_title = quote(title, safe='')
  152. if data_source != 'scihub':
  153. return jso
  154. pdf_intermediate_dict = jso['pdf_intermediate_dict']
  155. # 将 pdf_intermediate_dict 解压
  156. pdf_intermediate_dict = JsonCompressor.decompress_json(pdf_intermediate_dict)
  157. i = 0
  158. for page in pdf_intermediate_dict.values():
  159. if page.get('tables'):
  160. if len(page['tables']) > 0:
  161. j = 0
  162. for table in page['tables']:
  163. if debug_mode:
  164. image_path = join_path("s3://mllm-raw-media/pdf2md_img/", book_name, table['image_path'])
  165. else:
  166. image_path = join_path("s3://mllm-raw-media/pdf2md_img/", table['image_path'])
  167. if image_path.endswith('.jpg'):
  168. j += 1
  169. s3_client = get_s3_client(image_path)
  170. bucket_name, bucket_key = parse_bucket_key(image_path)
  171. # 通过s3_client获取图片到内存
  172. image_bytes = s3_client.get_object(Bucket=bucket_name, Key=bucket_key)['Body'].read()
  173. # 保存图片到新的位置
  174. if debug_mode:
  175. new_image_path = join_path("s3://mllm-raw-media/pdf2md_img/table_new/", url_encode_title + "_" + table['image_path'].lstrip('tables/'))
  176. else:
  177. new_image_path = join_path("s3://mllm-raw-media/pdf2md_img/table_new/", url_encode_title + f"_page{i}_{j}.jpg")
  178. logger.info(new_image_path, file=sys.stderr)
  179. bucket_name, bucket_key = parse_bucket_key(new_image_path)
  180. s3_client.put_object(Bucket=bucket_name, Key=bucket_key, Body=image_bytes)
  181. else:
  182. continue
  183. i += 1
  184. # 把无用的信息清空
  185. jso["doc_layout_result"] = ""
  186. jso["pdf_intermediate_dict"] = ""
  187. jso["pdf_meta"] = ""
  188. except Exception as e:
  189. jso = exception_handler(jso, e)
  190. return jso
  191. def drop_needdrop_pdf(jso: dict) -> dict:
  192. if jso.get('need_drop', False):
  193. logger.info(f"book_name is:{get_data_source(jso)}/{jso['file_id']} need drop", file=sys.stderr)
  194. jso["dropped"] = True
  195. return jso
  196. def pdf_intermediate_dict_to_markdown(jso: dict, debug_mode=False) -> dict:
  197. if debug_mode:
  198. pass
  199. else:# 如果debug没开,则检测是否有needdrop字段
  200. if jso.get('need_drop', False):
  201. book_name = join_path(get_data_source(jso), jso['file_id'])
  202. logger.info(f"book_name is:{book_name} need drop", file=sys.stderr)
  203. jso["dropped"] = True
  204. return jso
  205. try:
  206. pdf_intermediate_dict = jso['pdf_intermediate_dict']
  207. # 将 pdf_intermediate_dict 解压
  208. pdf_intermediate_dict = JsonCompressor.decompress_json(pdf_intermediate_dict)
  209. #markdown_content = mk_nlp_markdown(pdf_intermediate_dict)
  210. jso['content_list'] = mk_universal_format(pdf_intermediate_dict)
  211. #jso["content"] = markdown_content
  212. logger.info(f"book_name is:{get_data_source(jso)}/{jso['file_id']}")
  213. # 把无用的信息清空
  214. jso["doc_layout_result"] = ""
  215. jso["pdf_intermediate_dict"] = ""
  216. jso["pdf_meta"] = ""
  217. except Exception as e:
  218. jso = exception_handler(jso, e)
  219. return jso
  220. def parse_pdf(jso: dict, start_page_id=0, debug_mode=False) -> dict:
  221. #检测debug开关
  222. if debug_mode:
  223. pass
  224. else:# 如果debug没开,则检测是否有needdrop字段
  225. if jso.get('need_drop', False):
  226. return jso
  227. # 开始正式逻辑
  228. s3_pdf_path = jso.get('file_location')
  229. s3_config = get_s3_config(s3_pdf_path)
  230. model_output_json_list = jso.get('doc_layout_result')
  231. data_source = get_data_source(jso)
  232. file_id = jso.get('file_id')
  233. book_name = f"{data_source}/{file_id}"
  234. # 1.23.22已修复
  235. # if debug_mode:
  236. # pass
  237. # else:
  238. # if book_name == "zlib/zlib_21929367":
  239. # jso['need_drop'] = True
  240. # jso['drop_reason'] = DropReason.SPECIAL_PDF
  241. # return jso
  242. junk_img_bojids = jso['pdf_meta']['junk_img_bojids']
  243. # total_page = jso['pdf_meta']['total_page']
  244. # 增加检测 max_svgs 数量的检测逻辑,如果 max_svgs 超过3000则drop
  245. svgs_per_page_list = jso['pdf_meta']['svgs_per_page']
  246. max_svgs = max(svgs_per_page_list)
  247. if max_svgs > 3000:
  248. jso['need_drop'] = True
  249. jso['drop_reason'] = DropReason.HIGH_COMPUTATIONAL_lOAD_BY_SVGS
  250. # elif total_page > 1000:
  251. # jso['need_drop'] = True
  252. # jso['drop_reason'] = DropReason.HIGH_COMPUTATIONAL_lOAD_BY_TOTAL_PAGES
  253. else:
  254. try:
  255. save_path = s3_image_save_path
  256. image_s3_config = get_s3_config(save_path)
  257. start_time = time.time() # 记录开始时间
  258. # 先打印一下book_name和解析开始的时间
  259. logger.info(f"book_name is:{book_name},start_time is:{formatted_time(start_time)}", file=sys.stderr)
  260. pdf_info_dict = parse_pdf_by_model(s3_pdf_path, s3_config, model_output_json_list, save_path,
  261. book_name, pdf_model_profile=None,
  262. image_s3_config=image_s3_config,
  263. start_page_id=start_page_id, junk_img_bojids=junk_img_bojids,
  264. debug_mode=debug_mode)
  265. if pdf_info_dict.get('need_drop', False): # 如果返回的字典里有need_drop,则提取drop_reason并跳过本次解析
  266. jso['need_drop'] = True
  267. jso['drop_reason'] = pdf_info_dict["drop_reason"]
  268. else: # 正常返回,将 pdf_info_dict 压缩并存储
  269. pdf_info_dict = JsonCompressor.compress_json(pdf_info_dict)
  270. jso['pdf_intermediate_dict'] = pdf_info_dict
  271. end_time = time.time() # 记录完成时间
  272. parse_time = int(end_time - start_time) # 计算执行时间
  273. # 解析完成后打印一下book_name和耗时
  274. logger.info(f"book_name is:{book_name},end_time is:{formatted_time(end_time)},cost_time is:{parse_time}", file=sys.stderr)
  275. jso['parse_time'] = parse_time
  276. except Exception as e:
  277. jso = exception_handler(jso, e)
  278. return jso
  279. def ocr_parse_pdf(jso: dict, start_page_id=0, debug_mode=False) -> dict:
  280. # 检测debug开关
  281. if debug_mode:
  282. pass
  283. else: # 如果debug没开,则检测是否有needdrop字段
  284. if jso.get('need_drop', False):
  285. return jso
  286. s3_pdf_path = jso.get('file_location')
  287. s3_config = get_s3_config(s3_pdf_path)
  288. model_output_json_list = jso.get('doc_layout_result')
  289. data_source = get_data_source(jso)
  290. file_id = jso.get('file_id')
  291. book_name = f"{data_source}/{file_id}"
  292. try:
  293. save_path = s3_image_save_path
  294. image_s3_config = get_s3_config(save_path)
  295. start_time = time.time() # 记录开始时间
  296. # 先打印一下book_name和解析开始的时间
  297. logger.info(f"book_name is:{book_name},start_time is:{formatted_time(start_time)}", file=sys.stderr)
  298. pdf_info_dict = parse_pdf_by_ocr(
  299. s3_pdf_path,
  300. s3_config,
  301. model_output_json_list,
  302. save_path,
  303. book_name,
  304. pdf_model_profile=None,
  305. image_s3_config=image_s3_config,
  306. start_page_id=start_page_id,
  307. debug_mode=debug_mode
  308. )
  309. if pdf_info_dict.get('need_drop', False): # 如果返回的字典里有need_drop,则提取drop_reason并跳过本次解析
  310. jso['need_drop'] = True
  311. jso['drop_reason'] = pdf_info_dict["drop_reason"]
  312. else: # 正常返回,将 pdf_info_dict 压缩并存储
  313. pdf_info_dict = JsonCompressor.compress_json(pdf_info_dict)
  314. jso['pdf_intermediate_dict'] = pdf_info_dict
  315. end_time = time.time() # 记录完成时间
  316. parse_time = int(end_time - start_time) # 计算执行时间
  317. # 解析完成后打印一下book_name和耗时
  318. logger.info(f"book_name is:{book_name},end_time is:{formatted_time(end_time)},cost_time is:{parse_time}",
  319. file=sys.stderr)
  320. jso['parse_time'] = parse_time
  321. except Exception as e:
  322. jso = exception_handler(jso, e)
  323. return jso
  324. def ocr_pdf_intermediate_dict_to_markdown(jso: dict, debug_mode=False) -> dict:
  325. if debug_mode:
  326. pass
  327. else: # 如果debug没开,则检测是否有needdrop字段
  328. if jso.get('need_drop', False):
  329. book_name = join_path(get_data_source(jso), jso['file_id'])
  330. logger.info(f"book_name is:{book_name} need drop", file=sys.stderr)
  331. jso["dropped"] = True
  332. return jso
  333. try:
  334. pdf_intermediate_dict = jso['pdf_intermediate_dict']
  335. # 将 pdf_intermediate_dict 解压
  336. pdf_intermediate_dict = JsonCompressor.decompress_json(pdf_intermediate_dict)
  337. markdown_content = ocr_mk_mm_markdown(pdf_intermediate_dict)
  338. jso["content"] = markdown_content
  339. logger.info(f"book_name is:{get_data_source(jso)}/{jso['file_id']},markdown content length is {len(markdown_content)}", file=sys.stderr)
  340. # 把无用的信息清空
  341. jso["doc_layout_result"] = ""
  342. jso["pdf_intermediate_dict"] = ""
  343. jso["pdf_meta"] = ""
  344. except Exception as e:
  345. jso = exception_handler(jso, e)
  346. return jso
  347. def ocr_pdf_intermediate_dict_to_standard_format(jso: dict, debug_mode=False) -> dict:
  348. if debug_mode:
  349. pass
  350. else: # 如果debug没开,则检测是否有needdrop字段
  351. if jso.get('need_drop', False):
  352. book_name = join_path(get_data_source(jso), jso['file_id'])
  353. logger.info(f"book_name is:{book_name} need drop", file=sys.stderr)
  354. jso["dropped"] = True
  355. return jso
  356. try:
  357. pdf_intermediate_dict = jso['pdf_intermediate_dict']
  358. # 将 pdf_intermediate_dict 解压
  359. pdf_intermediate_dict = JsonCompressor.decompress_json(pdf_intermediate_dict)
  360. standard_format = ocr_mk_mm_standard_format(pdf_intermediate_dict)
  361. jso["content_list"] = standard_format
  362. logger.info(f"book_name is:{get_data_source(jso)}/{jso['file_id']},content_list length is {len(standard_format)}", file=sys.stderr)
  363. # 把无用的信息清空
  364. jso["doc_layout_result"] = ""
  365. jso["pdf_intermediate_dict"] = ""
  366. jso["pdf_meta"] = ""
  367. except Exception as e:
  368. jso = exception_handler(jso, e)
  369. return jso
  370. if __name__ == "__main__":
  371. pass