"""对pymupdf返回的结构里的公式进行替换,替换为模型识别的公式结果.""" import json import os from pathlib import Path from loguru import logger from magic_pdf.config.ocr_content_type import ContentType from magic_pdf.libs.commons import fitz TYPE_INLINE_EQUATION = ContentType.InlineEquation TYPE_INTERLINE_EQUATION = ContentType.InterlineEquation def combine_chars_to_pymudict(block_dict, char_dict): """把block级别的pymupdf 结构里加入char结构.""" # 因为block_dict 被裁剪过,因此先把他和char_dict文字块对齐,才能进行补充 char_map = {tuple(item['bbox']): item for item in char_dict} for i in range(len(block_dict)): # block block = block_dict[i] key = block['bbox'] char_dict_item = char_map[tuple(key)] char_dict_map = {tuple(item['bbox']): item for item in char_dict_item['lines']} for j in range(len(block['lines'])): lines = block['lines'][j] with_char_lines = char_dict_map[lines['bbox']] for k in range(len(lines['spans'])): spans = lines['spans'][k] try: chars = with_char_lines['spans'][k]['chars'] except Exception: logger.error(char_dict[i]['lines'][j]) spans['chars'] = chars return block_dict def calculate_overlap_area_2_minbox_area_ratio(bbox1, min_bbox): """计算box1和box2的重叠面积占最小面积的box的比例.""" # Determine the coordinates of the intersection rectangle x_left = max(bbox1[0], min_bbox[0]) y_top = max(bbox1[1], min_bbox[1]) x_right = min(bbox1[2], min_bbox[2]) y_bottom = min(bbox1[3], min_bbox[3]) if x_right < x_left or y_bottom < y_top: return 0.0 # The area of overlap area intersection_area = (x_right - x_left) * (y_bottom - y_top) min_box_area = (min_bbox[3] - min_bbox[1]) * (min_bbox[2] - min_bbox[0]) if min_box_area == 0: return 0 else: return intersection_area / min_box_area def _is_xin(bbox1, bbox2): area1 = abs(bbox1[2] - bbox1[0]) * abs(bbox1[3] - bbox1[1]) area2 = abs(bbox2[2] - bbox2[0]) * abs(bbox2[3] - bbox2[1]) if area1 < area2: ratio = calculate_overlap_area_2_minbox_area_ratio(bbox2, bbox1) else: ratio = calculate_overlap_area_2_minbox_area_ratio(bbox1, bbox2) return ratio > 0.6 def remove_text_block_in_interline_equation_bbox(interline_bboxes, text_blocks): """消除掉整个块都在行间公式块内部的文本块.""" for eq_bbox in interline_bboxes: removed_txt_blk = [] for text_blk in text_blocks: text_bbox = text_blk['bbox'] if ( calculate_overlap_area_2_minbox_area_ratio(eq_bbox['bbox'], text_bbox) >= 0.7 ): removed_txt_blk.append(text_blk) for blk in removed_txt_blk: text_blocks.remove(blk) return text_blocks def _is_in_or_part_overlap(box1, box2) -> bool: """两个bbox是否有部分重叠或者包含.""" if box1 is None or box2 is None: return False x0_1, y0_1, x1_1, y1_1 = box1 x0_2, y0_2, x1_2, y1_2 = box2 return not ( x1_1 < x0_2 # box1在box2的左边 or x0_1 > x1_2 # box1在box2的右边 or y1_1 < y0_2 # box1在box2的上边 or y0_1 > y1_2 ) # box1在box2的下边 def remove_text_block_overlap_interline_equation_bbox( interline_eq_bboxes, pymu_block_list ): """消除掉行行内公式有部分重叠的文本块的内容。 同时重新计算消除重叠之后文本块的大小.""" deleted_block = [] for text_block in pymu_block_list: deleted_line = [] for line in text_block['lines']: deleted_span = [] for span in line['spans']: deleted_chars = [] for char in span['chars']: if any( [ ( calculate_overlap_area_2_minbox_area_ratio( eq_bbox['bbox'], char['bbox'] ) > 0.5 ) for eq_bbox in interline_eq_bboxes ] ): deleted_chars.append(char) # 检查span里没有char则删除这个span for char in deleted_chars: span['chars'].remove(char) # 重新计算这个span的大小 if len(span['chars']) == 0: # 删除这个span deleted_span.append(span) else: span['bbox'] = ( min([b['bbox'][0] for b in span['chars']]), min([b['bbox'][1] for b in span['chars']]), max([b['bbox'][2] for b in span['chars']]), max([b['bbox'][3] for b in span['chars']]), ) # 检查这个span for span in deleted_span: line['spans'].remove(span) if len(line['spans']) == 0: # 删除这个line deleted_line.append(line) else: line['bbox'] = ( min([b['bbox'][0] for b in line['spans']]), min([b['bbox'][1] for b in line['spans']]), max([b['bbox'][2] for b in line['spans']]), max([b['bbox'][3] for b in line['spans']]), ) # 检查这个block是否可以删除 for line in deleted_line: text_block['lines'].remove(line) if len(text_block['lines']) == 0: # 删除block deleted_block.append(text_block) else: text_block['bbox'] = ( min([b['bbox'][0] for b in text_block['lines']]), min([b['bbox'][1] for b in text_block['lines']]), max([b['bbox'][2] for b in text_block['lines']]), max([b['bbox'][3] for b in text_block['lines']]), ) # 检查text block删除 for block in deleted_block: pymu_block_list.remove(block) if len(pymu_block_list) == 0: return [] return pymu_block_list def insert_interline_equations_textblock(interline_eq_bboxes, pymu_block_list): """在行间公式对应的地方插上一个伪造的block.""" for eq in interline_eq_bboxes: bbox = eq['bbox'] latex_content = eq['latex'] text_block = { 'number': len(pymu_block_list), 'type': 0, 'bbox': bbox, 'lines': [ { 'spans': [ { 'size': 9.962599754333496, 'type': TYPE_INTERLINE_EQUATION, 'flags': 4, 'font': TYPE_INTERLINE_EQUATION, 'color': 0, 'ascender': 0.9409999847412109, 'descender': -0.3050000071525574, 'latex': latex_content, 'origin': [bbox[0], bbox[1]], 'bbox': bbox, } ], 'wmode': 0, 'dir': [1.0, 0.0], 'bbox': bbox, } ], } pymu_block_list.append(text_block) def x_overlap_ratio(box1, box2): a, _, c, _ = box1 e, _, g, _ = box2 # 计算重叠宽度 overlap_x = max(min(c, g) - max(a, e), 0) # 计算box1的宽度 width1 = g - e # 计算重叠比例 overlap_ratio = overlap_x / width1 if width1 != 0 else 0 return overlap_ratio def __is_x_dir_overlap(bbox1, bbox2): return not (bbox1[2] < bbox2[0] or bbox1[0] > bbox2[2]) def __y_overlap_ratio(box1, box2): """""" _, b, _, d = box1 _, f, _, h = box2 # 计算重叠高度 overlap_y = max(min(d, h) - max(b, f), 0) # 计算box1的高度 height1 = d - b # 计算重叠比例 overlap_ratio = overlap_y / height1 if height1 != 0 else 0 return overlap_ratio def replace_line_v2(eqinfo, line): """扫描这一行所有的和公式框X方向重叠的char,然后计算char的左、右x0, x1,位于这个区间内的span删除掉。 最后与这个x0,x1有相交的span0, span1内部进行分割。""" first_overlap_span = -1 first_overlap_span_idx = -1 last_overlap_span = -1 delete_chars = [] for i in range(0, len(line['spans'])): if 'chars' not in line['spans'][i]: continue if line['spans'][i].get('_type', None) is not None: continue # 忽略,因为已经是插入的伪造span公式了 for char in line['spans'][i]['chars']: if __is_x_dir_overlap(eqinfo['bbox'], char['bbox']): line_txt = '' for span in line['spans']: span_txt = '' for ch in span['chars']: span_txt = span_txt + ch['c'] span_txt = span_txt + '' line_txt = line_txt + span_txt if first_overlap_span_idx == -1: first_overlap_span = line['spans'][i] first_overlap_span_idx = i last_overlap_span = line['spans'][i] delete_chars.append(char) # 第一个和最后一个char要进行检查,到底属于公式多还是属于正常span多 if len(delete_chars) > 0: ch0_bbox = delete_chars[0]['bbox'] if x_overlap_ratio(eqinfo['bbox'], ch0_bbox) < 0.51: delete_chars.remove(delete_chars[0]) if len(delete_chars) > 0: ch0_bbox = delete_chars[-1]['bbox'] if x_overlap_ratio(eqinfo['bbox'], ch0_bbox) < 0.51: delete_chars.remove(delete_chars[-1]) # 计算x方向上被删除区间内的char的真实x0, x1 if len(delete_chars): x0, x1 = ( min([b['bbox'][0] for b in delete_chars]), max([b['bbox'][2] for b in delete_chars]), ) else: # logger.debug(f"行内公式替换没有发生,尝试下一行匹配, eqinfo={eqinfo}") return False # 删除位于x0, x1这两个中间的span delete_span = [] for span in line['spans']: span_box = span['bbox'] if x0 <= span_box[0] and span_box[2] <= x1: delete_span.append(span) for span in delete_span: line['spans'].remove(span) equation_span = { 'size': 9.962599754333496, 'type': TYPE_INLINE_EQUATION, 'flags': 4, 'font': TYPE_INLINE_EQUATION, 'color': 0, 'ascender': 0.9409999847412109, 'descender': -0.3050000071525574, 'latex': '', 'origin': [337.1410153102337, 216.0205245153934], 'bbox': eqinfo['bbox'], } # equation_span = line['spans'][0].copy() equation_span['latex'] = eqinfo['latex'] equation_span['bbox'] = [x0, equation_span['bbox'][1], x1, equation_span['bbox'][3]] equation_span['origin'] = [equation_span['bbox'][0], equation_span['bbox'][1]] equation_span['chars'] = delete_chars equation_span['type'] = TYPE_INLINE_EQUATION equation_span['_eq_bbox'] = eqinfo['bbox'] line['spans'].insert(first_overlap_span_idx + 1, equation_span) # 放入公式 # logger.info(f"==>text is 【{line_txt}】, equation is 【{eqinfo['latex_text']}】") # 第一个、和最后一个有overlap的span进行分割,然后插入对应的位置 first_span_chars = [ char for char in first_overlap_span['chars'] if (char['bbox'][2] + char['bbox'][0]) / 2 < x0 ] tail_span_chars = [ char for char in last_overlap_span['chars'] if (char['bbox'][0] + char['bbox'][2]) / 2 > x1 ] if len(first_span_chars) > 0: first_overlap_span['chars'] = first_span_chars first_overlap_span['text'] = ''.join([char['c'] for char in first_span_chars]) first_overlap_span['bbox'] = ( first_overlap_span['bbox'][0], first_overlap_span['bbox'][1], max([chr['bbox'][2] for chr in first_span_chars]), first_overlap_span['bbox'][3], ) # first_overlap_span['_type'] = "first" else: # 删掉 if first_overlap_span not in delete_span: line['spans'].remove(first_overlap_span) if len(tail_span_chars) > 0: min_of_tail_span_x0 = min([chr['bbox'][0] for chr in tail_span_chars]) min_of_tail_span_y0 = min([chr['bbox'][1] for chr in tail_span_chars]) max_of_tail_span_x1 = max([chr['bbox'][2] for chr in tail_span_chars]) max_of_tail_span_y1 = max([chr['bbox'][3] for chr in tail_span_chars]) if last_overlap_span == first_overlap_span: # 这个时候应该插入一个新的 tail_span_txt = ''.join([char['c'] for char in tail_span_chars]) # noqa: F841 last_span_to_insert = last_overlap_span.copy() last_span_to_insert['chars'] = tail_span_chars last_span_to_insert['text'] = ''.join( [char['c'] for char in tail_span_chars] ) if equation_span['bbox'][2] >= last_overlap_span['bbox'][2]: last_span_to_insert['bbox'] = ( min_of_tail_span_x0, min_of_tail_span_y0, max_of_tail_span_x1, max_of_tail_span_y1, ) else: last_span_to_insert['bbox'] = ( min([chr['bbox'][0] for chr in tail_span_chars]), last_overlap_span['bbox'][1], last_overlap_span['bbox'][2], last_overlap_span['bbox'][3], ) # 插入到公式对象之后 equation_idx = line['spans'].index(equation_span) line['spans'].insert(equation_idx + 1, last_span_to_insert) # 放入公式 else: # 直接修改原来的span last_overlap_span['chars'] = tail_span_chars last_overlap_span['text'] = ''.join([char['c'] for char in tail_span_chars]) last_overlap_span['bbox'] = ( min([chr['bbox'][0] for chr in tail_span_chars]), last_overlap_span['bbox'][1], last_overlap_span['bbox'][2], last_overlap_span['bbox'][3], ) else: # 删掉 if ( last_overlap_span not in delete_span and last_overlap_span != first_overlap_span ): line['spans'].remove(last_overlap_span) remain_txt = '' for span in line['spans']: span_txt = '' for char in span['chars']: span_txt = span_txt + char['c'] span_txt = span_txt + '' remain_txt = remain_txt + span_txt # logger.info(f"<== succ replace, text is 【{remain_txt}】, equation is 【{eqinfo['latex_text']}】") return True def replace_eq_blk(eqinfo, text_block): """替换行内公式.""" for line in text_block['lines']: line_bbox = line['bbox'] if ( _is_xin(eqinfo['bbox'], line_bbox) or __y_overlap_ratio(eqinfo['bbox'], line_bbox) > 0.6 ): # 定位到行, 使用y方向重合率是因为有的时候,一个行的宽度会小于公式位置宽度:行很高,公式很窄, replace_succ = replace_line_v2(eqinfo, line) if not replace_succ: # 有的时候,一个pdf的line高度从API里会计算的有问题,因此在行内span级别会替换不成功,这就需要继续重试下一行 continue else: break else: return False return True def replace_inline_equations(inline_equation_bboxes, raw_text_blocks): """替换行内公式.""" for eqinfo in inline_equation_bboxes: eqbox = eqinfo['bbox'] for blk in raw_text_blocks: if _is_xin(eqbox, blk['bbox']): if not replace_eq_blk(eqinfo, blk): logger.warning(f'行内公式没有替换成功:{eqinfo} ') else: break return raw_text_blocks def remove_chars_in_text_blocks(text_blocks): """删除text_blocks里的char.""" for blk in text_blocks: for line in blk['lines']: for span in line['spans']: _ = span.pop('chars', 'no such key') return text_blocks def replace_equations_in_textblock( raw_text_blocks, inline_equation_bboxes, interline_equation_bboxes ): """替换行间和和行内公式为latex.""" raw_text_blocks = remove_text_block_in_interline_equation_bbox( interline_equation_bboxes, raw_text_blocks ) # 消除重叠:第一步,在公式内部的 raw_text_blocks = remove_text_block_overlap_interline_equation_bbox( interline_equation_bboxes, raw_text_blocks ) # 消重,第二步,和公式覆盖的 insert_interline_equations_textblock(interline_equation_bboxes, raw_text_blocks) raw_text_blocks = replace_inline_equations(inline_equation_bboxes, raw_text_blocks) return raw_text_blocks def draw_block_on_pdf_with_txt_replace_eq_bbox(json_path, pdf_path): """""" new_pdf = f'{Path(pdf_path).parent}/{Path(pdf_path).stem}.step3-消除行内公式text_block.pdf' with open(json_path, 'r', encoding='utf-8') as f: obj = json.loads(f.read()) if os.path.exists(new_pdf): os.remove(new_pdf) new_doc = fitz.open('') doc = fitz.open(pdf_path) # noqa: F841 new_doc = fitz.open(pdf_path) for i in range(len(new_doc)): page = new_doc[i] inline_equation_bboxes = obj[f'page_{i}']['inline_equations'] interline_equation_bboxes = obj[f'page_{i}']['interline_equations'] raw_text_blocks = obj[f'page_{i}']['preproc_blocks'] raw_text_blocks = remove_text_block_in_interline_equation_bbox( interline_equation_bboxes, raw_text_blocks ) # 消除重叠:第一步,在公式内部的 raw_text_blocks = remove_text_block_overlap_interline_equation_bbox( interline_equation_bboxes, raw_text_blocks ) # 消重,第二步,和公式覆盖的 insert_interline_equations_textblock(interline_equation_bboxes, raw_text_blocks) raw_text_blocks = replace_inline_equations( inline_equation_bboxes, raw_text_blocks ) # 为了检验公式是否重复,把每一行里,含有公式的span背景改成黄色的 color_map = [fitz.pdfcolor['blue'], fitz.pdfcolor['green']] # noqa: F841 j = 0 # noqa: F841 for blk in raw_text_blocks: for i, line in enumerate(blk['lines']): # line_box = line['bbox'] # shape = page.new_shape() # shape.draw_rect(line_box) # shape.finish(color=fitz.pdfcolor['red'], fill=color_map[j%2], fill_opacity=0.3) # shape.commit() # j = j+1 for i, span in enumerate(line['spans']): shape_page = page.new_shape() span_type = span.get('_type') color = fitz.pdfcolor['blue'] if span_type == 'first': color = fitz.pdfcolor['blue'] elif span_type == 'tail': color = fitz.pdfcolor['green'] elif span_type == TYPE_INLINE_EQUATION: color = fitz.pdfcolor['black'] else: color = None b = span['bbox'] shape_page.draw_rect(b) shape_page.finish(color=None, fill=color, fill_opacity=0.3) shape_page.commit() new_doc.save(new_pdf) logger.info(f'save ok {new_pdf}') final_json = json.dumps(obj, ensure_ascii=False, indent=2) with open('equations_test/final_json.json', 'w') as f: f.write(final_json) return new_pdf if __name__ == '__main__': # draw_block_on_pdf_with_txt_replace_eq_bbox(new_json_path, equation_color_pdf) pass