import time # from anyio import Path from magic_pdf.libs.commons import fitz, get_delta_time, get_img_s3_client import json import os import math from loguru import logger from magic_pdf.layout.bbox_sort import ( prepare_bboxes_for_layout_split, ) from magic_pdf.layout.layout_sort import LAYOUT_UNPROC, get_bboxes_layout, get_columns_cnt_of_layout, sort_text_block from magic_pdf.libs.drop_reason import DropReason from magic_pdf.libs.markdown_utils import escape_special_markdown_char from magic_pdf.libs.safe_filename import sanitize_filename from magic_pdf.libs.vis_utils import draw_bbox_on_page, draw_layout_bbox_on_page from magic_pdf.pre_proc.detect_images import parse_images from magic_pdf.pre_proc.detect_tables import parse_tables # 获取tables的bbox from magic_pdf.pre_proc.detect_equation import parse_equations # 获取equations的bbox from magic_pdf.pre_proc.detect_header import parse_headers # 获取headers的bbox from magic_pdf.pre_proc.detect_page_number import parse_pageNos # 获取pageNos的bbox from magic_pdf.pre_proc.detect_footnote import parse_footnotes_by_model, parse_footnotes_by_rule # 获取footnotes的bbox from magic_pdf.pre_proc.detect_footer_by_model import parse_footers # 获取footers的bbox from magic_pdf.post_proc.detect_para import ( ParaProcessPipeline, TitleDetectionException, TitleLevelException, ParaSplitException, ParaMergeException, DenseSingleLineBlockException, ) from magic_pdf.pre_proc.main_text_font import get_main_text_font from magic_pdf.pre_proc.remove_colored_strip_bbox import remove_colored_strip_textblock from magic_pdf.pre_proc.remove_footer_header import remove_headder_footer_one_page ''' from para.para_pipeline import ParaProcessPipeline from para.exceptions import ( TitleDetectionException, TitleLevelException, ParaSplitException, ParaMergeException, DenseSingleLineBlockException, ) ''' from magic_pdf.libs.commons import read_file, join_path from magic_pdf.libs.pdf_image_tools import save_images_by_bboxes from magic_pdf.post_proc.remove_footnote import merge_footnote_blocks, remove_footnote_blocks from magic_pdf.pre_proc.citationmarker_remove import remove_citation_marker from magic_pdf.pre_proc.equations_replace import combine_chars_to_pymudict, remove_chars_in_text_blocks, replace_equations_in_textblock from magic_pdf.pre_proc.pdf_pre_filter import pdf_filter from magic_pdf.pre_proc.detect_footer_header_by_statistics import drop_footer_header from magic_pdf.pre_proc.construct_paras import construct_page_component from magic_pdf.pre_proc.fix_image import combine_images, fix_image_vertical, fix_seperated_image, include_img_title from magic_pdf.post_proc.pdf_post_filter import pdf_post_filter from magic_pdf.pre_proc.remove_rotate_bbox import get_side_boundry, remove_rotate_side_textblock, remove_side_blank_block from magic_pdf.pre_proc.resolve_bbox_conflict import check_text_block_horizontal_overlap, resolve_bbox_overlap_conflict from magic_pdf.pre_proc.fix_table import fix_table_text_block, fix_tables, include_table_title denseSingleLineBlockException_msg = DenseSingleLineBlockException().message titleDetectionException_msg = TitleDetectionException().message titleLevelException_msg = TitleLevelException().message paraSplitException_msg = ParaSplitException().message paraMergeException_msg = ParaMergeException().message def get_docx_model_output(pdf_model_output, pdf_model_s3_profile, page_id): if isinstance(pdf_model_output, str): model_output_json_path = join_path(pdf_model_output, f"page_{page_id + 1}.json") # 模型输出的页面编号从1开始的 if os.path.exists(model_output_json_path): json_from_docx = read_file(model_output_json_path, pdf_model_s3_profile) model_output_json = json.loads(json_from_docx) else: try: model_output_json_path = join_path(pdf_model_output, "model.json") with open(model_output_json_path, "r", encoding="utf-8") as f: model_output_json = json.load(f) model_output_json = model_output_json["doc_layout_result"][page_id] except: s3_model_output_json_path = join_path(pdf_model_output, f"page_{page_id + 1}.json") s3_model_output_json_path = join_path(pdf_model_output, f"{page_id}.json") #s3_model_output_json_path = join_path(pdf_model_output, f"page_{page_id }.json") # logger.warning(f"model_output_json_path: {model_output_json_path} not found. try to load from s3: {s3_model_output_json_path}") s = read_file(s3_model_output_json_path, pdf_model_s3_profile) return json.loads(s) elif isinstance(pdf_model_output, list): model_output_json = pdf_model_output[page_id] return model_output_json def parse_pdf_by_model( s3_pdf_path, s3_pdf_profile, pdf_model_output, save_path, book_name, pdf_model_profile=None, image_s3_config=None, start_page_id=0, end_page_id=None, junk_img_bojids=[], debug_mode=False, ): pdf_bytes = read_file(s3_pdf_path, s3_pdf_profile) save_tmp_path = os.path.join(os.path.dirname(__file__), "../..", "tmp", "unittest") md_bookname_save_path = "" book_name = sanitize_filename(book_name) if debug_mode: save_path = join_path(save_tmp_path, "md") pdf_local_path = join_path(save_tmp_path, "download-pdfs", book_name) if not os.path.exists(os.path.dirname(pdf_local_path)): # 如果目录不存在,创建它 os.makedirs(os.path.dirname(pdf_local_path)) md_bookname_save_path = join_path(save_tmp_path, "md", book_name) if not os.path.exists(md_bookname_save_path): # 如果目录不存在,创建它 os.makedirs(md_bookname_save_path) with open(pdf_local_path + ".pdf", "wb") as pdf_file: pdf_file.write(pdf_bytes) pdf_docs = fitz.open("pdf", pdf_bytes) pdf_info_dict = {} img_s3_client = get_img_s3_client(save_path, image_s3_config) # 更改函数名和参数,避免歧义 # img_s3_client = "img_s3_client" #不创建这个对象,直接用字符串占位 start_time = time.time() """通过统计pdf全篇文字,识别正文字体""" main_text_font = get_main_text_font(pdf_docs) end_page_id = end_page_id if end_page_id else len(pdf_docs) - 1 for page_id in range(start_page_id, end_page_id + 1): page = pdf_docs[page_id] page_width = page.rect.width page_height = page.rect.height if debug_mode: time_now = time.time() logger.info(f"page_id: {page_id}, last_page_cost_time: {get_delta_time(start_time)}") start_time = time_now """ # 通过一个规则,过滤掉单页超过1500非junkimg的pdf # 对单页面非重复id的img数量做统计,如果当前页超过1500则直接return need_drop """ page_imgs = page.get_images() img_counts = 0 for img in page_imgs: img_bojid = img[0] if img_bojid in junk_img_bojids: # 判断这个图片在不在junklist中 continue # 如果在junklist就不用管了,跳过 else: recs = page.get_image_rects(img, transform=True) if recs: # 如果这张图在当前页面有展示 img_counts += 1 if img_counts >= 1500: # 如果去除了junkimg的影响,单页img仍然超过1500的话,就排除当前pdf logger.warning( f"page_id: {page_id}, img_counts: {img_counts}, drop this pdf: {book_name}, drop_reason: {DropReason.HIGH_COMPUTATIONAL_lOAD_BY_IMGS}" ) result = {"need_drop": True, "drop_reason": DropReason.HIGH_COMPUTATIONAL_lOAD_BY_IMGS} if not debug_mode: return result """ ================================================================================================================================== 首先获取基本的block数据,对pdf进行分解,获取图片、表格、公式、text的bbox """ # 解析pdf原始文本block text_raw_blocks = page.get_text( "dict", flags=fitz.TEXTFLAGS_TEXT, )["blocks"] model_output_json = get_docx_model_output(pdf_model_output, pdf_model_profile, page_id) # 解析图片 image_bboxes = parse_images(page_id, page, model_output_json, junk_img_bojids) image_bboxes = fix_image_vertical(image_bboxes, text_raw_blocks) # 修正图片的位置 image_bboxes = fix_seperated_image(image_bboxes) # 合并有边重合的图片 image_bboxes = include_img_title(text_raw_blocks, image_bboxes) # 向图片上方和下方寻找title,使用规则进行匹配,暂时只支持英文规则 """此时image_bboxes中可能出现这种情况,水平并列的2个图片,下方分别有各自的子标题,2个子标题下方又有大标题(形如Figxxx),会出现2个图片的bbox都包含了这个大标题,这种情况需要把图片合并""" image_bboxes = combine_images(image_bboxes) # 合并图片 # 解析表格并对table_bboxes进行位置的微调,防止表格周围的文字被截断 table_bboxes = parse_tables(page_id, page, model_output_json) table_bboxes = fix_tables(page, table_bboxes, include_table_title=True, scan_line_num=2) # 修正 table_bboxes = fix_table_text_block(text_raw_blocks, table_bboxes) # 修正与text block的关系,某些table修正与pymupdf获取到的table内textblock没有完全包含,因此要进行一次修正。 #debug_show_bbox(pdf_docs, page_id, table_bboxes, [], [b['bbox'] for b in text_raw_blocks], join_path(save_path, book_name, f"{book_name}_debug.pdf"), 7) table_bboxes = include_table_title(text_raw_blocks, table_bboxes) # 向table上方和下方寻找title,使用规则进行匹配,暂时只支持英文规则 # 解析公式 equations_inline_bboxes, equations_interline_bboxes = parse_equations(page_id, page, model_output_json) """ ================================================================================================================================== 进入预处理-1阶段 ------------------- # # 解析标题 # title_bboxs = parse_titles(page_id, page, model_output_json) # # 评估Layout是否规整、简单 # isSimpleLayout_flag, fullColumn_cnt, subColumn_cnt, curPage_loss = evaluate_pdf_layout(page_id, page, model_output_json) 接下来开始进行预处理过程 """ """去掉每页的页码、页眉、页脚""" page_no_bboxs = parse_pageNos(page_id, page, model_output_json) header_bboxs = parse_headers(page_id, page, model_output_json) footer_bboxs = parse_footers(page_id, page, model_output_json) image_bboxes, table_bboxes, remain_text_blocks, removed_hdr_foot_txt_block, removed_hdr_foot_img_block, removed_hdr_foot_table = remove_headder_footer_one_page(text_raw_blocks, image_bboxes, table_bboxes, header_bboxs, footer_bboxs, page_no_bboxs, page_width, page_height) """去除页面上半部分长条色块内的文本块""" remain_text_blocks, removed_colored_narrow_strip_background_text_block = remove_colored_strip_textblock(remain_text_blocks, page) #debug_show_bbox(pdf_docs, page_id, footnote_bboxes_by_model, [b['bbox'] for b in remain_text_blocks], header_bboxs, join_path(save_path, book_name, f"{book_name}_debug.pdf"), 7) """去掉旋转的文字:水印、垂直排列的文字""" remain_text_blocks, removed_non_horz_text_block = remove_rotate_side_textblock( remain_text_blocks, page_width, page_height ) # 去掉水印,非水平文字 remain_text_blocks, removed_empty_side_block = remove_side_blank_block(remain_text_blocks, page_width, page_height) # 删除页面四周可能会留下的完全空白的textblock,这种block形成原因未知 """出现在图片、表格上的文字块去掉,把层叠的图片单独分离出来,不参与layout的计算""" ( image_bboxes, table_bboxes, equations_interline_bboxes, equations_inline_bboxes, remain_text_blocks, text_block_on_image_removed, images_overlap_backup, interline_eq_temp_text_block ) = resolve_bbox_overlap_conflict( image_bboxes, table_bboxes, equations_interline_bboxes, equations_inline_bboxes, remain_text_blocks ) # """去掉footnote, 从文字和图片中""" # # 通过模型识别到的footnote # footnote_bboxes_by_model = parse_footnotes_by_model(page_id, page, model_output_json, md_bookname_save_path, # debug_mode=debug_mode) # # 通过规则识别到的footnote # footnote_bboxes_by_rule = parse_footnotes_by_rule(remain_text_blocks, page_height, page_id) """ ================================================================================================================================== """ if debug_mode: # debugmode截图到本地 save_path = join_path(save_tmp_path, "md") # 把图、表、公式都进行截图,保存到存储上,返回图片路径作为内容 image_info, image_backup_info, table_info, inline_eq_info, interline_eq_info = save_images_by_bboxes( book_name, page_id, page, save_path, image_bboxes, images_overlap_backup, table_bboxes, equations_inline_bboxes, equations_interline_bboxes, # 传入img_s3_client img_s3_client, ) # 只要表格和图片的截图 """"以下进入到公式替换环节 """ char_level_text_blocks = page.get_text("rawdict", flags=fitz.TEXTFLAGS_TEXT)['blocks'] remain_text_blocks = combine_chars_to_pymudict(remain_text_blocks, char_level_text_blocks)# 合并chars remain_text_blocks = replace_equations_in_textblock(remain_text_blocks, inline_eq_info, interline_eq_info) remain_text_blocks = remove_citation_marker(remain_text_blocks) # 公式替换之后去角标,防止公式无法替换成功。但是这样也会带来个问题就是把角标当公式。各有优劣。 remain_text_blocks = remove_chars_in_text_blocks(remain_text_blocks) # 减少中间态数据体积 #debug_show_bbox(pdf_docs, page_id, [b['bbox'] for b in inline_eq_info], [b['bbox'] for b in interline_eq_info], [], join_path(save_path, book_name, f"{book_name}_debug.pdf"), 3) """去掉footnote, 从文字和图片中(先去角标再去footnote试试)""" # 通过模型识别到的footnote footnote_bboxes_by_model = parse_footnotes_by_model(page_id, page, model_output_json, md_bookname_save_path, debug_mode=debug_mode) # 通过规则识别到的footnote footnote_bboxes_by_rule = parse_footnotes_by_rule(remain_text_blocks, page_height, page_id, main_text_font) """进入pdf过滤器,去掉一些不合理的pdf""" is_good_pdf, err = pdf_filter(page, remain_text_blocks, table_bboxes, image_bboxes) if not is_good_pdf: logger.warning(f"page_id: {page_id}, drop this pdf: {book_name}, reason: {err}") if not debug_mode: return err """ ================================================================================================================================== 进行版面布局切分和过滤 """ """在切分之前,先检查一下bbox是否有左右重叠的情况,如果有,那么就认为这个pdf暂时没有能力处理好,这种左右重叠的情况大概率是由于pdf里的行间公式、表格没有被正确识别出来造成的 """ is_text_block_horz_overlap = check_text_block_horizontal_overlap(remain_text_blocks, header_bboxs, footer_bboxs) if is_text_block_horz_overlap: # debug_show_bbox(pdf_docs, page_id, [b['bbox'] for b in remain_text_blocks], [], [], join_path(save_path, book_name, f"{book_name}_debug.pdf"), 0) logger.warning(f"page_id: {page_id}, drop this pdf: {book_name}, reason: {DropReason.TEXT_BLCOK_HOR_OVERLAP}") result = {"need_drop": True, "drop_reason": DropReason.TEXT_BLCOK_HOR_OVERLAP} if not debug_mode: return result """统一格式化成一个数据结构用于计算layout""" page_y0 = 0 if len(header_bboxs) == 0 else max([b[3] for b in header_bboxs]) page_y1 = page_height if len(footer_bboxs) == 0 else min([b[1] for b in footer_bboxs]) left_x, right_x = get_side_boundry(removed_non_horz_text_block, page_width, page_height) page_boundry = [math.floor(left_x), page_y0 + 1, math.ceil(right_x), page_y1 - 1] # 返回的是一个数组,每个元素[x0, y0, x1, y1, block_content, idx_x, idx_y], 初始时候idx_x, idx_y都是None. 对于图片、公式来说,block_content是图片的地址, 对于段落来说,block_content是段落的内容 all_bboxes = prepare_bboxes_for_layout_split( image_info, image_backup_info, table_info, inline_eq_info, interline_eq_info, remain_text_blocks, page_boundry, page) #debug_show_bbox(pdf_docs, page_id, [], [], all_bboxes, join_path(save_path, book_name, f"{book_name}_debug.pdf"), 1) """page_y0, page_y1能够过滤掉页眉和页脚,不会算作layout内""" layout_bboxes, layout_tree = get_bboxes_layout(all_bboxes, page_boundry, page_id) if len(remain_text_blocks)>0 and len(all_bboxes)>0 and len(layout_bboxes)==0: logger.warning(f"page_id: {page_id}, drop this pdf: {book_name}, reason: {DropReason.CAN_NOT_DETECT_PAGE_LAYOUT}") result = {"need_drop": True, "drop_reason": DropReason.CAN_NOT_DETECT_PAGE_LAYOUT} if not debug_mode: return result """以下去掉复杂的布局和超过2列的布局""" if any([lay["layout_label"] == LAYOUT_UNPROC for lay in layout_bboxes]): # 复杂的布局 logger.warning(f"page_id: {page_id}, drop this pdf: {book_name}, reason: {DropReason.COMPLICATED_LAYOUT}") result = {"need_drop": True, "drop_reason": DropReason.COMPLICATED_LAYOUT} if not debug_mode: return result layout_column_width = get_columns_cnt_of_layout(layout_tree) if layout_column_width > 2: # 去掉超过2列的布局pdf logger.warning(f"page_id: {page_id}, drop this pdf: {book_name}, reason: {DropReason.TOO_MANY_LAYOUT_COLUMNS}") result = { "need_drop": True, "drop_reason": DropReason.TOO_MANY_LAYOUT_COLUMNS, "extra_info": {"column_cnt": layout_column_width}, } if not debug_mode: return result """ ================================================================================================================================== 构造出下游需要的数据结构 """ remain_text_blocks = remain_text_blocks + interline_eq_temp_text_block # 把计算layout时候临时删除的行间公式再放回去,防止行间公式替换的时候丢失。 removed_text_blocks = [] removed_text_blocks.extend(removed_hdr_foot_txt_block) # removed_text_blocks.extend(removed_footnote_text_block) removed_text_blocks.extend(text_block_on_image_removed) removed_text_blocks.extend(removed_non_horz_text_block) removed_text_blocks.extend(removed_colored_narrow_strip_background_text_block) removed_images = [] # removed_images.extend(footnote_imgs) removed_images.extend(removed_hdr_foot_img_block) images_backup = [] images_backup.extend(image_backup_info) remain_text_blocks = escape_special_markdown_char(remain_text_blocks) # 转义span里的text sorted_text_remain_text_block = sort_text_block(remain_text_blocks, layout_bboxes) footnote_bboxes_tmp = [] footnote_bboxes_tmp.extend(footnote_bboxes_by_model) footnote_bboxes_tmp.extend(footnote_bboxes_by_rule) page_info = construct_page_component( page_id, image_info, table_info, sorted_text_remain_text_block, layout_bboxes, inline_eq_info, interline_eq_info, page.get_text("dict", flags=fitz.TEXTFLAGS_TEXT)["blocks"], removed_text_blocks=removed_text_blocks, removed_image_blocks=removed_images, images_backup=images_backup, droped_table_block=[], table_backup=[], layout_tree=layout_tree, page_w=page.rect.width, page_h=page.rect.height, footnote_bboxes_tmp=footnote_bboxes_tmp ) pdf_info_dict[f"page_{page_id}"] = page_info # end page for '''计算后处理阶段耗时''' start_time = time.time() """ ================================================================================================================================== 去掉页眉和页脚,这里需要用到一定的统计量,所以放到最后 页眉和页脚主要从文本box和图片box中去除,位于页面的四周。 下面函数会直接修改pdf_info_dict,从文字块中、图片中删除属于页眉页脚的内容,删除内容做相对应记录 """ # 去页眉页脚 header, footer = drop_footer_header(pdf_info_dict) """对单个layout内footnote和他下面的所有textbbox合并""" for page_key, page_info in pdf_info_dict.items(): page_info = merge_footnote_blocks(page_info, main_text_font) page_info = remove_footnote_blocks(page_info) pdf_info_dict[page_key] = page_info """进入pdf后置过滤器,去掉一些不合理的pdf""" i = 0 for page_info in pdf_info_dict.values(): is_good_pdf, err = pdf_post_filter(page_info) if not is_good_pdf: logger.warning(f"page_id: {i}, drop this pdf: {book_name}, reason: {err}") if not debug_mode: return err i += 1 if debug_mode: params_file_save_path = join_path(save_tmp_path, "md", book_name, "preproc_out.json") page_draw_rect_save_path = join_path(save_tmp_path, "md", book_name, "layout.pdf") # dir_path = os.path.dirname(page_draw_rect_save_path) # if not os.path.exists(dir_path): # # 如果目录不存在,创建它 # os.makedirs(dir_path) with open(params_file_save_path, "w", encoding="utf-8") as f: json.dump(pdf_info_dict, f, ensure_ascii=False, indent=4) # 先检测本地 page_draw_rect_save_path 是否存在,如果存在则删除 if os.path.exists(page_draw_rect_save_path): os.remove(page_draw_rect_save_path) # 绘制bbox和layout到pdf draw_bbox_on_page(pdf_docs, pdf_info_dict, page_draw_rect_save_path) draw_layout_bbox_on_page(pdf_docs, pdf_info_dict, header, footer, page_draw_rect_save_path) if debug_mode: # 打印后处理阶段耗时 logger.info(f"post_processing_time: {get_delta_time(start_time)}") """ ================================================================================================================================== 进入段落处理-2阶段 """ start_time = time.time() para_process_pipeline = ParaProcessPipeline() def _deal_with_text_exception(error_info): logger.warning(f"page_id: {page_id}, drop this pdf: {book_name}, reason: {error_info}") if error_info == denseSingleLineBlockException_msg: logger.warning(f"Drop this pdf: {book_name}, reason: {DropReason.DENSE_SINGLE_LINE_BLOCK}") result = {"need_drop": True, "drop_reason": DropReason.DENSE_SINGLE_LINE_BLOCK} return result if error_info == titleDetectionException_msg: logger.warning(f"Drop this pdf: {book_name}, reason: {DropReason.TITLE_DETECTION_FAILED}") result = {"need_drop": True, "drop_reason": DropReason.TITLE_DETECTION_FAILED} return result elif error_info == titleLevelException_msg: logger.warning(f"Drop this pdf: {book_name}, reason: {DropReason.TITLE_LEVEL_FAILED}") result = {"need_drop": True, "drop_reason": DropReason.TITLE_LEVEL_FAILED} return result elif error_info == paraSplitException_msg: logger.warning(f"Drop this pdf: {book_name}, reason: {DropReason.PARA_SPLIT_FAILED}") result = {"need_drop": True, "drop_reason": DropReason.PARA_SPLIT_FAILED} return result elif error_info == paraMergeException_msg: logger.warning(f"Drop this pdf: {book_name}, reason: {DropReason.PARA_MERGE_FAILED}") result = {"need_drop": True, "drop_reason": DropReason.PARA_MERGE_FAILED} return result if debug_mode: input_pdf_file = f"{pdf_local_path}.pdf" output_dir = f"{save_path}/{book_name}" output_pdf_file = f"{output_dir}/pdf_annos.pdf" """ Call the para_process_pipeline function to process the pdf_info_dict. Parameters: para_debug_mode: str or None If para_debug_mode is None, the para_process_pipeline will not keep any intermediate results. If para_debug_mode is "simple", the para_process_pipeline will only keep the annos on the pdf and the final results as a json file. If para_debug_mode is "full", the para_process_pipeline will keep all the intermediate results generated during each step. """ pdf_info_dict, error_info = para_process_pipeline.para_process_pipeline( pdf_info_dict, para_debug_mode="simple", input_pdf_path=input_pdf_file, output_pdf_path=output_pdf_file, ) # 打印段落处理阶段耗时 logger.info(f"para_process_time: {get_delta_time(start_time)}") # debug的时候不return drop信息 if error_info is not None: _deal_with_text_exception(error_info) return pdf_info_dict else: pdf_info_dict, error_info = para_process_pipeline.para_process_pipeline(pdf_info_dict) if error_info is not None: return _deal_with_text_exception(error_info) return pdf_info_dict