许瑞 1 рік тому
батько
коміт
b16599cdf2

+ 35 - 20
magic_pdf/cli/magicpdf.py

@@ -26,6 +26,8 @@ import json as json_parse
 from datetime import datetime
 import click
 from magic_pdf.pipe.UNIPipe import UNIPipe
+from magic_pdf.pipe.OCRPipe import OCRPipe
+from magic_pdf.pipe.TXTPipe import TXTPipe
 from magic_pdf.libs.config_reader import get_s3_config
 from magic_pdf.libs.path_utils import (
     parse_s3path,
@@ -33,9 +35,9 @@ from magic_pdf.libs.path_utils import (
     remove_non_official_s3_args,
 )
 from magic_pdf.libs.config_reader import get_local_dir
-from magic_pdf.rw.S3ReaderWriter import S3ReaderWriter, MODE_BIN, MODE_TXT
+from magic_pdf.rw.S3ReaderWriter import S3ReaderWriter
 from magic_pdf.rw.DiskReaderWriter import DiskReaderWriter
-
+from magic_pdf.rw.AbsReaderWriter import AbsReaderWriter
 
 parse_pdf_methods = click.Choice(["ocr", "txt", "auto"])
 
@@ -53,24 +55,34 @@ def prepare_env():
 
 
 def _do_parse(pdf_bytes, model_list, parse_method, image_writer, md_writer, image_dir):
-    uni_pipe = UNIPipe(pdf_bytes, model_list, image_writer, image_dir, is_debug=True)
-    jso_useful_key = {
-        "_pdf_type": "txt",
-        "model_list": model_list,
-    }
-    if parse_method == "ocr":
-        jso_useful_key["_pdf_type"] = "ocr"
-
-    uni_pipe.pipe_parse()
-    md_content = uni_pipe.pipe_mk_markdown()
+    if parse_method == "auto":
+        pipe = UNIPipe(pdf_bytes, model_list, image_writer, image_dir, is_debug=True)
+    elif parse_method == "txt":
+        pipe = TXTPipe(pdf_bytes, model_list, image_writer, image_dir, is_debug=True)
+    elif parse_method == "ocr":
+        pipe = OCRPipe(pdf_bytes, model_list, image_writer, image_dir, is_debug=True)
+    else:
+        print("unknow parse method")
+        os.exit(1)
+
+    pipe.pipe_classify()
+    pipe.pipe_parse()
+    md_content = pipe.pipe_mk_markdown()
     part_file_name = datetime.now().strftime("%H-%M-%S")
-    md_writer.write(content=md_content, path=f"{part_file_name}.md", mode=MODE_TXT)
     md_writer.write(
-        content=json_parse.dumps(
-            uni_pipe.pdf_mid_data, ensure_ascii=False, indent=4
-        ),
+        content=md_content, path=f"{part_file_name}.md", mode=AbsReaderWriter.MODE_TXT
+    )
+    md_writer.write(
+        content=json_parse.dumps(pipe.pdf_mid_data, ensure_ascii=False, indent=4),
         path=f"{part_file_name}.json",
-        mode=MODE_TXT,
+        mode=AbsReaderWriter.MODE_TXT,
+    )
+    try:
+        content_list = pipe.pipe_mk_uni_format()
+    except Exception as e:
+        print(e)
+    md_writer.write(
+        str(content_list), f"{part_file_name}.txt", AbsReaderWriter.MODE_TXT
     )
 
 
@@ -106,7 +118,10 @@ def json_command(json, method):
             byte_start, byte_end = int(may_range_params[0]), int(may_range_params[1])
             byte_end += byte_start - 1
         return s3_rw.read_jsonl(
-            remove_non_official_s3_args(s3path), byte_start, byte_end, MODE_BIN
+            remove_non_official_s3_args(s3path),
+            byte_start,
+            byte_end,
+            AbsReaderWriter.MODE_BIN,
         )
 
     jso = json_parse.loads(read_s3_path(json).decode("utf-8"))
@@ -119,7 +134,7 @@ def json_command(json, method):
 
     _do_parse(
         pdf_data,
-        jso['doc_layout_result'],
+        jso["doc_layout_result"],
         method,
         local_image_rw,
         local_md_rw,
@@ -148,7 +163,7 @@ def pdf_command(pdf, model, method):
 
     def read_fn(path):
         disk_rw = DiskReaderWriter(os.path.dirname(path))
-        return disk_rw.read(os.path.basename(path), MODE_BIN)
+        return disk_rw.read(os.path.basename(path), AbsReaderWriter.MODE_BIN)
 
     pdf_data = read_fn(pdf)
     jso = json_parse.loads(read_fn(model).decode("utf-8"))

+ 214 - 0
magic_pdf/pdf_parse_by_txt_v2.py

@@ -0,0 +1,214 @@
+import time
+
+from loguru import logger
+
+from magic_pdf.layout.layout_sort import get_bboxes_layout
+from magic_pdf.libs.convert_utils import dict_to_list
+from magic_pdf.libs.hash_utils import compute_md5
+from magic_pdf.libs.commons import fitz, get_delta_time
+from magic_pdf.model.magic_model import MagicModel
+from magic_pdf.pre_proc.construct_page_dict import ocr_construct_page_component_v2
+from magic_pdf.pre_proc.cut_image import ocr_cut_image_and_table
+from magic_pdf.pre_proc.ocr_detect_all_bboxes import ocr_prepare_bboxes_for_layout_split
+from magic_pdf.pre_proc.ocr_dict_merge import (
+    sort_blocks_by_layout,
+    fill_spans_in_blocks,
+    fix_block_spans,
+)
+from magic_pdf.libs.ocr_content_type import ContentType
+from magic_pdf.pre_proc.ocr_span_list_modify import (
+    remove_overlaps_min_spans,
+    get_qa_need_list_v2,
+)
+from magic_pdf.pre_proc.equations_replace import (
+    combine_chars_to_pymudict,
+    remove_chars_in_text_blocks,
+    replace_equations_in_textblock,
+)
+from magic_pdf.pre_proc.equations_replace import (
+    combine_chars_to_pymudict,
+    remove_chars_in_text_blocks,
+    replace_equations_in_textblock,
+)
+from magic_pdf.pre_proc.citationmarker_remove import remove_citation_marker
+
+
+def txt_spans_extract(pdf_page, inline_equations, interline_equations):
+    text_raw_blocks = pdf_page.get_text("dict", flags=fitz.TEXTFLAGS_TEXT)["blocks"]
+    char_level_text_blocks = pdf_page.get_text("rawdict", flags=fitz.TEXTFLAGS_TEXT)[
+        "blocks"
+    ]
+    text_blocks = combine_chars_to_pymudict(text_raw_blocks, char_level_text_blocks)
+    text_blocks = replace_equations_in_textblock(
+        text_blocks, inline_equations, interline_equations
+    )
+    text_blocks = remove_citation_marker(text_blocks)
+    text_blocks = remove_chars_in_text_blocks(text_blocks)
+    spans = []
+    for v in text_blocks:
+        for line in v["lines"]:
+            for span in line["spans"]:
+                spans.append(
+                    {
+                        "bbox": list(span["bbox"]),
+                        "content": span["text"],
+                        "type": ContentType.Text,
+                    }
+                )
+    return spans
+
+
+def replace_text_span(pymu_spans, ocr_spans):
+    return list(filter(lambda x: x["type"] != ContentType.Text, ocr_spans)) + pymu_spans
+
+
+def parse_pdf_by_txt(
+    pdf_bytes,
+    model_list,
+    imageWriter,
+    start_page_id=0,
+    end_page_id=None,
+    debug_mode=False,
+):
+    pdf_bytes_md5 = compute_md5(pdf_bytes)
+    pdf_docs = fitz.open("pdf", pdf_bytes)
+
+    """初始化空的pdf_info_dict"""
+    pdf_info_dict = {}
+
+    """用model_list和docs对象初始化magic_model"""
+    magic_model = MagicModel(model_list, pdf_docs)
+
+    """根据输入的起始范围解析pdf"""
+    end_page_id = end_page_id if end_page_id else len(pdf_docs) - 1
+
+    """初始化启动时间"""
+    start_time = time.time()
+
+    for page_id in range(start_page_id, end_page_id + 1):
+
+        """debug时输出每页解析的耗时"""
+        if debug_mode:
+            time_now = time.time()
+            logger.info(
+                f"page_id: {page_id}, last_page_cost_time: {get_delta_time(start_time)}"
+            )
+            start_time = time_now
+
+        """从magic_model对象中获取后面会用到的区块信息"""
+        img_blocks = magic_model.get_imgs(page_id)
+        table_blocks = magic_model.get_tables(page_id)
+        discarded_blocks = magic_model.get_discarded(page_id)
+        text_blocks = magic_model.get_text_blocks(page_id)
+        title_blocks = magic_model.get_title_blocks(page_id)
+        inline_equations, interline_equations, interline_equation_blocks = (
+            magic_model.get_equations(page_id)
+        )
+
+        page_w, page_h = magic_model.get_page_size(page_id)
+
+        """将所有区块的bbox整理到一起"""
+        all_bboxes = ocr_prepare_bboxes_for_layout_split(
+            img_blocks,
+            table_blocks,
+            discarded_blocks,
+            text_blocks,
+            title_blocks,
+            interline_equation_blocks,
+            page_w,
+            page_h,
+        )
+
+        """根据区块信息计算layout"""
+        page_boundry = [0, 0, page_w, page_h]
+        layout_bboxes, layout_tree = get_bboxes_layout(
+            all_bboxes, page_boundry, page_id
+        )
+
+        """根据layout顺序,对当前页面所有需要留下的block进行排序"""
+        sorted_blocks = sort_blocks_by_layout(all_bboxes, layout_bboxes)
+
+        """ocr 中文本类的 span 用 pymu spans 替换!"""
+        ocr_spans = magic_model.get_all_spans(page_id)
+        pymu_spans = txt_spans_extract(
+            pdf_docs[page_id], inline_equations, interline_equations
+        )
+        spans = replace_text_span(pymu_spans, ocr_spans)
+
+        """删除重叠spans中较小的那些"""
+        spans, dropped_spans_by_span_overlap = remove_overlaps_min_spans(spans)
+        """对image和table截图"""
+        spans = ocr_cut_image_and_table(
+            spans, pdf_docs[page_id], page_id, pdf_bytes_md5, imageWriter
+        )
+
+        """将span填入排好序的blocks中"""
+        block_with_spans = fill_spans_in_blocks(sorted_blocks, spans)
+
+        """对block进行fix操作"""
+        fix_blocks = fix_block_spans(block_with_spans, img_blocks, table_blocks)
+
+        """获取QA需要外置的list"""
+        images, tables, interline_equations = get_qa_need_list_v2(fix_blocks)
+
+        """构造pdf_info_dict"""
+        page_info = ocr_construct_page_component_v2(
+            fix_blocks,
+            layout_bboxes,
+            page_id,
+            page_w,
+            page_h,
+            layout_tree,
+            images,
+            tables,
+            interline_equations,
+            discarded_blocks,
+        )
+        pdf_info_dict[f"page_{page_id}"] = page_info
+
+    """分段"""
+    pass
+
+    """dict转list"""
+    pdf_info_list = dict_to_list(pdf_info_dict)
+    new_pdf_info_dict = {
+        "pdf_info": pdf_info_list,
+    }
+
+    return new_pdf_info_dict
+
+
+if __name__ == "__main__":
+    if 1:
+        import fitz
+        import json
+
+        with open("/opt/data/pdf/20240418/25536-00.pdf", "rb") as f:
+            pdf_bytes = f.read()
+        pdf_docs = fitz.open("pdf", pdf_bytes)
+
+        with open("/opt/data/pdf/20240418/25536-00.json") as f:
+            model_list = json.loads(f.readline())
+
+        magic_model = MagicModel(model_list, pdf_docs)
+        for i in range(7):
+            print(magic_model.get_imgs(i))
+
+        for page_no, page in enumerate(pdf_docs):
+            inline_equations, interline_equations, interline_equation_blocks = (
+                magic_model.get_equations(page_no)
+            )
+
+            text_raw_blocks = page.get_text("dict", flags=fitz.TEXTFLAGS_TEXT)["blocks"]
+            char_level_text_blocks = page.get_text(
+                "rawdict", flags=fitz.TEXTFLAGS_TEXT
+            )["blocks"]
+            text_blocks = combine_chars_to_pymudict(
+                text_raw_blocks, char_level_text_blocks
+            )
+            text_blocks = replace_equations_in_textblock(
+                text_blocks, inline_equations, interline_equations
+            )
+            text_blocks = remove_citation_marker(text_blocks)
+
+            text_blocks = remove_chars_in_text_blocks(text_blocks)

+ 284 - 219
magic_pdf/pre_proc/equations_replace.py

@@ -1,6 +1,7 @@
 """
 对pymupdf返回的结构里的公式进行替换,替换为模型识别的公式结果
 """
+
 from magic_pdf.libs.commons import fitz
 import json
 import os
@@ -17,25 +18,25 @@ def combine_chars_to_pymudict(block_dict, char_dict):
     把block级别的pymupdf 结构里加入char结构
     """
     # 因为block_dict 被裁剪过,因此先把他和char_dict文字块对齐,才能进行补充
-    char_map = {tuple(item['bbox']):item for item in char_dict}
-    
-    for i in range(len(block_dict)): # blcok
+    char_map = {tuple(item["bbox"]): item for item in char_dict}
+
+    for i in range(len(block_dict)):  # blcok
         block = block_dict[i]
-        key = block['bbox']
+        key = block["bbox"]
         char_dict_item = char_map[tuple(key)]
-        char_dict_map = {tuple(item['bbox']):item for item in char_dict_item['lines']}
-        for j in range(len(block['lines'])):
-            lines = block['lines'][j]
-            with_char_lines = char_dict_map[lines['bbox']]
-            for k in range(len(lines['spans'])):
-                spans = lines['spans'][k]
+        char_dict_map = {tuple(item["bbox"]): item for item in char_dict_item["lines"]}
+        for j in range(len(block["lines"])):
+            lines = block["lines"][j]
+            with_char_lines = char_dict_map[lines["bbox"]]
+            for k in range(len(lines["spans"])):
+                spans = lines["spans"][k]
                 try:
-                    chars = with_char_lines['spans'][k]['chars']
+                    chars = with_char_lines["spans"][k]["chars"]
                 except Exception as e:
-                    logger.error(char_dict[i]['lines'][j])
-                    
-                spans['chars'] = chars
-                
+                    logger.error(char_dict[i]["lines"][j])
+
+                spans["chars"] = chars
+
     return block_dict
 
 
@@ -54,23 +55,22 @@ def calculate_overlap_area_2_minbox_area_ratio(bbox1, min_bbox):
 
     # The area of overlap area
     intersection_area = (x_right - x_left) * (y_bottom - y_top)
-    min_box_area = (min_bbox[3]-min_bbox[1])*(min_bbox[2]-min_bbox[0])
-    if min_box_area==0:
+    min_box_area = (min_bbox[3] - min_bbox[1]) * (min_bbox[2] - min_bbox[0])
+    if min_box_area == 0:
         return 0
     else:
         return intersection_area / min_box_area
-    
+
 
 def _is_xin(bbox1, bbox2):
-    area1 = abs(bbox1[2]-bbox1[0])*abs(bbox1[3]-bbox1[1])
-    area2 = abs(bbox2[2]-bbox2[0])*abs(bbox2[3]-bbox2[1])
-    if area1<area2:
+    area1 = abs(bbox1[2] - bbox1[0]) * abs(bbox1[3] - bbox1[1])
+    area2 = abs(bbox2[2] - bbox2[0]) * abs(bbox2[3] - bbox2[1])
+    if area1 < area2:
         ratio = calculate_overlap_area_2_minbox_area_ratio(bbox2, bbox1)
     else:
         ratio = calculate_overlap_area_2_minbox_area_ratio(bbox1, bbox2)
 
-    return ratio>0.6
-
+    return ratio > 0.6
 
 
 def remove_text_block_in_interline_equation_bbox(interline_bboxes, text_blocks):
@@ -78,8 +78,11 @@ def remove_text_block_in_interline_equation_bbox(interline_bboxes, text_blocks):
     for eq_bbox in interline_bboxes:
         removed_txt_blk = []
         for text_blk in text_blocks:
-            text_bbox = text_blk['bbox']
-            if calculate_overlap_area_2_minbox_area_ratio(eq_bbox['bbox'], text_bbox)>=0.7:
+            text_bbox = text_blk["bbox"]
+            if (
+                calculate_overlap_area_2_minbox_area_ratio(eq_bbox["bbox"], text_bbox)
+                >= 0.7
+            ):
                 removed_txt_blk.append(text_blk)
         for blk in removed_txt_blk:
             text_blocks.remove(blk)
@@ -87,65 +90,88 @@ def remove_text_block_in_interline_equation_bbox(interline_bboxes, text_blocks):
     return text_blocks
 
 
-
 def _is_in_or_part_overlap(box1, box2) -> bool:
     """
     两个bbox是否有部分重叠或者包含
     """
     if box1 is None or box2 is None:
         return False
-    
+
     x0_1, y0_1, x1_1, y1_1 = box1
     x0_2, y0_2, x1_2, y1_2 = box2
 
-    return not (x1_1 < x0_2 or  # box1在box2的左边
-                x0_1 > x1_2 or  # box1在box2的右边
-                y1_1 < y0_2 or  # box1在box2的上边
-                y0_1 > y1_2)    # box1在box2的下边
+    return not (
+        x1_1 < x0_2  # box1在box2的左边
+        or x0_1 > x1_2  # box1在box2的右边
+        or y1_1 < y0_2  # box1在box2的上边
+        or y0_1 > y1_2
+    )  # box1在box2的下边
 
 
-def remove_text_block_overlap_interline_equation_bbox(interline_eq_bboxes, pymu_block_list):
+def remove_text_block_overlap_interline_equation_bbox(
+    interline_eq_bboxes, pymu_block_list
+):
     """消除掉行行内公式有部分重叠的文本块的内容。
     同时重新计算消除重叠之后文本块的大小"""
     deleted_block = []
     for text_block in pymu_block_list:
         deleted_line = []
-        for line in text_block['lines']:
+        for line in text_block["lines"]:
             deleted_span = []
-            for span in line['spans']:
+            for span in line["spans"]:
                 deleted_chars = []
-                for char in span['chars']:
-                    if any([_is_in_or_part_overlap(char['bbox'], eq_bbox['bbox']) for eq_bbox in interline_eq_bboxes]):
+                for char in span["chars"]:
+                    if any(
+                        [
+                            _is_in_or_part_overlap(char["bbox"], eq_bbox["bbox"])
+                            for eq_bbox in interline_eq_bboxes
+                        ]
+                    ):
                         deleted_chars.append(char)
                 # 检查span里没有char则删除这个span
                 for char in deleted_chars:
-                    span['chars'].remove(char)
+                    span["chars"].remove(char)
                 # 重新计算这个span的大小
-                if len(span['chars'])==0: # 删除这个span
+                if len(span["chars"]) == 0:  # 删除这个span
                     deleted_span.append(span)
                 else:
-                    span['bbox'] = min([b['bbox'][0] for b in span['chars']]),min([b['bbox'][1] for b in span['chars']]),max([b['bbox'][2] for b in span['chars']]), max([b['bbox'][3] for b in span['chars']])
-                    
+                    span["bbox"] = (
+                        min([b["bbox"][0] for b in span["chars"]]),
+                        min([b["bbox"][1] for b in span["chars"]]),
+                        max([b["bbox"][2] for b in span["chars"]]),
+                        max([b["bbox"][3] for b in span["chars"]]),
+                    )
+
             # 检查这个span
             for span in deleted_span:
-                line['spans'].remove(span)
-            if len(line['spans'])==0: #删除这个line
+                line["spans"].remove(span)
+            if len(line["spans"]) == 0:  # 删除这个line
                 deleted_line.append(line)
             else:
-                line['bbox'] = min([b['bbox'][0] for b in line['spans']]),min([b['bbox'][1] for b in line['spans']]),max([b['bbox'][2] for b in line['spans']]), max([b['bbox'][3] for b in line['spans']])
+                line["bbox"] = (
+                    min([b["bbox"][0] for b in line["spans"]]),
+                    min([b["bbox"][1] for b in line["spans"]]),
+                    max([b["bbox"][2] for b in line["spans"]]),
+                    max([b["bbox"][3] for b in line["spans"]]),
+                )
 
         # 检查这个block是否可以删除
         for line in deleted_line:
-            text_block['lines'].remove(line)
-        if len(text_block['lines'])==0: # 删除block
+            text_block["lines"].remove(line)
+        if len(text_block["lines"]) == 0:  # 删除block
             deleted_block.append(text_block)
         else:
-            text_block['bbox'] = min([b['bbox'][0] for b in text_block['lines']]),min([b['bbox'][1] for b in text_block['lines']]),max([b['bbox'][2] for b in text_block['lines']]), max([b['bbox'][3] for b in text_block['lines']])
+            text_block["bbox"] = (
+                min([b["bbox"][0] for b in text_block["lines"]]),
+                min([b["bbox"][1] for b in text_block["lines"]]),
+                max([b["bbox"][2] for b in text_block["lines"]]),
+                max([b["bbox"][3] for b in text_block["lines"]]),
+            )
 
     # 检查text block删除
     for block in deleted_block:
         pymu_block_list.remove(block)
-    if len(pymu_block_list)==0:
+    if len(pymu_block_list) == 0:
         return []
 
     return pymu_block_list
@@ -154,49 +180,44 @@ def remove_text_block_overlap_interline_equation_bbox(interline_eq_bboxes, pymu_
 def insert_interline_equations_textblock(interline_eq_bboxes, pymu_block_list):
     """在行间公式对应的地方插上一个伪造的block"""
     for eq in interline_eq_bboxes:
-        bbox = eq['bbox']
-        latex_content = eq['latex_text']
+        bbox = eq["bbox"]
+        latex_content = eq["latex"]
         text_block = {
-                "number": len(pymu_block_list),
-                "type": 0,
-                "bbox": bbox,
-                "lines": [
-                    {
-                        "spans": [
-                            {
-                                "size": 9.962599754333496,
-                                "_type": TYPE_INTERLINE_EQUATION,
-                                "flags": 4,
-                                "font": TYPE_INTERLINE_EQUATION,
-                                "color": 0,
-                                "ascender": 0.9409999847412109,
-                                "descender": -0.3050000071525574,
-                                "text": f"\n$$\n{latex_content}\n$$\n",
-                                "origin": [
-                                    bbox[0],
-                                    bbox[1]
-                                ],
-                                "bbox": bbox
-                            }
-                        ],
-                        "wmode": 0,
-                        "dir": [
-                            1.0,
-                            0.0
-                        ],
-                        "bbox": bbox
-                    }
-                ]
-            }
+            "number": len(pymu_block_list),
+            "type": 0,
+            "bbox": bbox,
+            "lines": [
+                {
+                    "spans": [
+                        {
+                            "size": 9.962599754333496,
+                            "_type": TYPE_INTERLINE_EQUATION,
+                            "flags": 4,
+                            "font": TYPE_INTERLINE_EQUATION,
+                            "color": 0,
+                            "ascender": 0.9409999847412109,
+                            "descender": -0.3050000071525574,
+                            "text": f"\n$$\n{latex_content}\n$$\n",
+                            "origin": [bbox[0], bbox[1]],
+                            "bbox": bbox,
+                        }
+                    ],
+                    "wmode": 0,
+                    "dir": [1.0, 0.0],
+                    "bbox": bbox,
+                }
+            ],
+        }
         pymu_block_list.append(text_block)
-        
+
+
 def x_overlap_ratio(box1, box2):
     a, _, c, _ = box1
     e, _, g, _ = box2
 
     # 计算重叠宽度
     overlap_x = max(min(c, g) - max(a, e), 0)
-    
+
     # 计算box1的宽度
     width1 = g - e
 
@@ -205,8 +226,10 @@ def x_overlap_ratio(box1, box2):
 
     return overlap_ratio
 
+
 def __is_x_dir_overlap(bbox1, bbox2):
-    return not (bbox1[2]<bbox2[0] or bbox1[0]>bbox2[2])
+    return not (bbox1[2] < bbox2[0] or bbox1[0] > bbox2[2])
+
 
 def __y_overlap_ratio(box1, box2):
     """"""
@@ -215,7 +238,7 @@ def __y_overlap_ratio(box1, box2):
 
     # 计算重叠高度
     overlap_y = max(min(d, h) - max(b, f), 0)
-    
+
     # 计算box1的高度
     height1 = d - b
 
@@ -223,9 +246,10 @@ def __y_overlap_ratio(box1, box2):
     overlap_ratio = overlap_y / height1 if height1 != 0 else 0
 
     return overlap_ratio
-    
+
+
 def replace_line_v2(eqinfo, line):
-    """    
+    """
     扫描这一行所有的和公式框X方向重叠的char,然后计算char的左、右x0, x1,位于这个区间内的span删除掉。
     最后与这个x0,x1有相交的span0, span1内部进行分割。
     """
@@ -233,142 +257,172 @@ def replace_line_v2(eqinfo, line):
     first_overlap_span_idx = -1
     last_overlap_span = -1
     delete_chars = []
-    for i in range(0, len(line['spans'])):
-        if line['spans'][i].get("_type", None) is not None:
-            continue # 忽略,因为已经是插入的伪造span公式了
-        
-        for char in line['spans'][i]['chars']:
-            if __is_x_dir_overlap(eqinfo['bbox'], char['bbox']):
+    for i in range(0, len(line["spans"])):
+        if line["spans"][i].get("_type", None) is not None:
+            continue  # 忽略,因为已经是插入的伪造span公式了
+
+        for char in line["spans"][i]["chars"]:
+            if __is_x_dir_overlap(eqinfo["bbox"], char["bbox"]):
                 line_txt = ""
-                for span in line['spans']:
+                for span in line["spans"]:
                     span_txt = "<span>"
-                    for ch in span['chars']:
-                        span_txt = span_txt + ch['c']
+                    for ch in span["chars"]:
+                        span_txt = span_txt + ch["c"]
 
                     span_txt = span_txt + "</span>"
 
                     line_txt = line_txt + span_txt
-                    
+
                 if first_overlap_span_idx == -1:
-                    first_overlap_span = line['spans'][i]
+                    first_overlap_span = line["spans"][i]
                     first_overlap_span_idx = i
-                last_overlap_span = line['spans'][i]
+                last_overlap_span = line["spans"][i]
                 delete_chars.append(char)
 
     # 第一个和最后一个char要进行检查,到底属于公式多还是属于正常span多
-    if len(delete_chars)>0:
-        ch0_bbox = delete_chars[0]['bbox']
-        if x_overlap_ratio(eqinfo['bbox'], ch0_bbox)<0.51:
+    if len(delete_chars) > 0:
+        ch0_bbox = delete_chars[0]["bbox"]
+        if x_overlap_ratio(eqinfo["bbox"], ch0_bbox) < 0.51:
             delete_chars.remove(delete_chars[0])
-    if len(delete_chars)>0:
-        ch0_bbox = delete_chars[-1]['bbox']
-        if x_overlap_ratio(eqinfo['bbox'], ch0_bbox)<0.51:
+    if len(delete_chars) > 0:
+        ch0_bbox = delete_chars[-1]["bbox"]
+        if x_overlap_ratio(eqinfo["bbox"], ch0_bbox) < 0.51:
             delete_chars.remove(delete_chars[-1])
-            
+
     # 计算x方向上被删除区间内的char的真实x0, x1
     if len(delete_chars):
-        x0, x1 = min([b['bbox'][0] for b in delete_chars]), max([b['bbox'][2] for b in delete_chars])
+        x0, x1 = min([b["bbox"][0] for b in delete_chars]), max(
+            [b["bbox"][2] for b in delete_chars]
+        )
     else:
         logger.debug(f"行内公式替换没有发生,尝试下一行匹配, eqinfo={eqinfo}")
         return False
-        
+
     # 删除位于x0, x1这两个中间的span
     delete_span = []
-    for span in line['spans']:
-        span_box = span['bbox']
-        if x0<=span_box[0] and span_box[2]<=x1:
+    for span in line["spans"]:
+        span_box = span["bbox"]
+        if x0 <= span_box[0] and span_box[2] <= x1:
             delete_span.append(span)
     for span in delete_span:
-        line['spans'].remove(span)
-
+        line["spans"].remove(span)
 
     equation_span = {
-                                "size": 9.962599754333496,
-                                "_type": TYPE_INLINE_EQUATION,
-                                "flags": 4,
-                                "font": TYPE_INLINE_EQUATION,
-                                "color": 0,
-                                "ascender": 0.9409999847412109,
-                                "descender": -0.3050000071525574,
-                                "text": "",
-                                "origin": [
-                                    337.1410153102337,
-                                    216.0205245153934
-                                ],
-                                "bbox": [
-                                    337.1410153102337,
-                                    216.0205245153934,
-                                    390.4496373892022,
-                                    228.50171037628277
-                                ]
-                            }
-    #equation_span = line['spans'][0].copy()
-    equation_span['text'] = f" ${eqinfo['latex_text']}$ "
-    equation_span['bbox'] = [x0, equation_span['bbox'][1], x1, equation_span['bbox'][3]] 
-    equation_span['origin'] = [equation_span['bbox'][0], equation_span['bbox'][1]]
-    equation_span['chars'] = delete_chars
-    equation_span['_type'] = TYPE_INLINE_EQUATION
-    equation_span['_eq_bbox'] = eqinfo['bbox']
-    line['spans'].insert(first_overlap_span_idx+1, equation_span) # 放入公式
+        "size": 9.962599754333496,
+        "_type": TYPE_INLINE_EQUATION,
+        "flags": 4,
+        "font": TYPE_INLINE_EQUATION,
+        "color": 0,
+        "ascender": 0.9409999847412109,
+        "descender": -0.3050000071525574,
+        "text": "",
+        "origin": [337.1410153102337, 216.0205245153934],
+        "bbox": [
+            337.1410153102337,
+            216.0205245153934,
+            390.4496373892022,
+            228.50171037628277,
+        ],
+    }
+    # equation_span = line['spans'][0].copy()
+    equation_span["text"] = f" ${eqinfo['latex']}$ "
+    equation_span["bbox"] = [x0, equation_span["bbox"][1], x1, equation_span["bbox"][3]]
+    equation_span["origin"] = [equation_span["bbox"][0], equation_span["bbox"][1]]
+    equation_span["chars"] = delete_chars
+    equation_span["_type"] = TYPE_INLINE_EQUATION
+    equation_span["_eq_bbox"] = eqinfo["bbox"]
+    line["spans"].insert(first_overlap_span_idx + 1, equation_span)  # 放入公式
 
     # logger.info(f"==>text is 【{line_txt}】, equation is 【{eqinfo['latex_text']}】")
-    
+
     # 第一个、和最后一个有overlap的span进行分割,然后插入对应的位置
-    first_span_chars = [char for char in first_overlap_span['chars'] if (char['bbox'][2]+char['bbox'][0])/2<x0]
-    tail_span_chars  = [char for char in last_overlap_span['chars'] if (char['bbox'][0]+char['bbox'][2])/2>x1]
-    
-    if len(first_span_chars)>0:
-        first_overlap_span['chars'] = first_span_chars
-        first_overlap_span['text'] = ''.join([char['c'] for char in first_span_chars])
-        first_overlap_span['bbox'] = (first_overlap_span['bbox'][0], first_overlap_span['bbox'][1], max([chr['bbox'][2] for chr in first_span_chars]), first_overlap_span['bbox'][3])
+    first_span_chars = [
+        char
+        for char in first_overlap_span["chars"]
+        if (char["bbox"][2] + char["bbox"][0]) / 2 < x0
+    ]
+    tail_span_chars = [
+        char
+        for char in last_overlap_span["chars"]
+        if (char["bbox"][0] + char["bbox"][2]) / 2 > x1
+    ]
+
+    if len(first_span_chars) > 0:
+        first_overlap_span["chars"] = first_span_chars
+        first_overlap_span["text"] = "".join([char["c"] for char in first_span_chars])
+        first_overlap_span["bbox"] = (
+            first_overlap_span["bbox"][0],
+            first_overlap_span["bbox"][1],
+            max([chr["bbox"][2] for chr in first_span_chars]),
+            first_overlap_span["bbox"][3],
+        )
         # first_overlap_span['_type'] = "first"
     else:
         # 删掉
         if first_overlap_span not in delete_span:
-            line['spans'].remove(first_overlap_span)
-        
-
-    if len(tail_span_chars)>0:
-        if last_overlap_span==first_overlap_span: # 这个时候应该插入一个新的
-            tail_span_txt = ''.join([char['c'] for char in tail_span_chars])
-            last_span_to_insert =  last_overlap_span.copy()
-            last_span_to_insert['chars'] = tail_span_chars
-            last_span_to_insert['text'] = ''.join([char['c'] for char in tail_span_chars])
-            last_span_to_insert['bbox'] = (min([chr['bbox'][0] for chr in tail_span_chars]), last_overlap_span['bbox'][1], last_overlap_span['bbox'][2], last_overlap_span['bbox'][3])
+            line["spans"].remove(first_overlap_span)
+
+    if len(tail_span_chars) > 0:
+        if last_overlap_span == first_overlap_span:  # 这个时候应该插入一个新的
+            tail_span_txt = "".join([char["c"] for char in tail_span_chars])
+            last_span_to_insert = last_overlap_span.copy()
+            last_span_to_insert["chars"] = tail_span_chars
+            last_span_to_insert["text"] = "".join(
+                [char["c"] for char in tail_span_chars]
+            )
+            last_span_to_insert["bbox"] = (
+                min([chr["bbox"][0] for chr in tail_span_chars]),
+                last_overlap_span["bbox"][1],
+                last_overlap_span["bbox"][2],
+                last_overlap_span["bbox"][3],
+            )
             # 插入到公式对象之后
-            equation_idx = line['spans'].index(equation_span)
-            line['spans'].insert(equation_idx+1, last_span_to_insert) # 放入公式
-        else: # 直接修改原来的span
-            last_overlap_span['chars'] = tail_span_chars
-            last_overlap_span['text'] = ''.join([char['c'] for char in tail_span_chars])
-            last_overlap_span['bbox'] = (min([chr['bbox'][0] for chr in tail_span_chars]), last_overlap_span['bbox'][1], last_overlap_span['bbox'][2], last_overlap_span['bbox'][3])
+            equation_idx = line["spans"].index(equation_span)
+            line["spans"].insert(equation_idx + 1, last_span_to_insert)  # 放入公式
+        else:  # 直接修改原来的span
+            last_overlap_span["chars"] = tail_span_chars
+            last_overlap_span["text"] = "".join([char["c"] for char in tail_span_chars])
+            last_overlap_span["bbox"] = (
+                min([chr["bbox"][0] for chr in tail_span_chars]),
+                last_overlap_span["bbox"][1],
+                last_overlap_span["bbox"][2],
+                last_overlap_span["bbox"][3],
+            )
     else:
         # 删掉
-        if last_overlap_span not in delete_span and last_overlap_span!=first_overlap_span:
-            line['spans'].remove(last_overlap_span)
-            
+        if (
+            last_overlap_span not in delete_span
+            and last_overlap_span != first_overlap_span
+        ):
+            line["spans"].remove(last_overlap_span)
+
     remain_txt = ""
-    for span in line['spans']:
+    for span in line["spans"]:
         span_txt = "<span>"
-        for char in span['chars']:
-            span_txt = span_txt + char['c']
+        for char in span["chars"]:
+            span_txt = span_txt + char["c"]
 
         span_txt = span_txt + "</span>"
 
         remain_txt = remain_txt + span_txt
-    
+
     # logger.info(f"<== succ replace, text is 【{remain_txt}】, equation is 【{eqinfo['latex_text']}】")
-    
+
     return True
 
 
 def replace_eq_blk(eqinfo, text_block):
     """替换行内公式"""
-    for line in text_block['lines']:
-        line_bbox = line['bbox']
-        if _is_xin(eqinfo['bbox'], line_bbox) or __y_overlap_ratio(eqinfo['bbox'], line_bbox)>0.6: # 定位到行, 使用y方向重合率是因为有的时候,一个行的宽度会小于公式位置宽度:行很高,公式很窄,
-            replace_succ =  replace_line_v2(eqinfo, line)
-            if not replace_succ: # 有的时候,一个pdf的line高度从API里会计算的有问题,因此在行内span级别会替换不成功,这就需要继续重试下一行
+    for line in text_block["lines"]:
+        line_bbox = line["bbox"]
+        if (
+            _is_xin(eqinfo["bbox"], line_bbox)
+            or __y_overlap_ratio(eqinfo["bbox"], line_bbox) > 0.6
+        ):  # 定位到行, 使用y方向重合率是因为有的时候,一个行的宽度会小于公式位置宽度:行很高,公式很窄,
+            replace_succ = replace_line_v2(eqinfo, line)
+            if (
+                not replace_succ
+            ):  # 有的时候,一个pdf的line高度从API里会计算的有问题,因此在行内span级别会替换不成功,这就需要继续重试下一行
                 continue
             else:
                 break
@@ -380,9 +434,9 @@ def replace_eq_blk(eqinfo, text_block):
 def replace_inline_equations(inline_equation_bboxes, raw_text_blocks):
     """替换行内公式"""
     for eqinfo in inline_equation_bboxes:
-        eqbox = eqinfo['bbox']
+        eqbox = eqinfo["bbox"]
         for blk in raw_text_blocks:
-            if _is_xin(eqbox, blk['bbox']):
+            if _is_xin(eqbox, blk["bbox"]):
                 if not replace_eq_blk(eqinfo, blk):
                     logger.error(f"行内公式没有替换成功:{eqinfo} ")
                 else:
@@ -390,94 +444,105 @@ def replace_inline_equations(inline_equation_bboxes, raw_text_blocks):
 
     return raw_text_blocks
 
+
 def remove_chars_in_text_blocks(text_blocks):
     """删除text_blocks里的char"""
     for blk in text_blocks:
-        for line in blk['lines']:
-            for span in line['spans']:
+        for line in blk["lines"]:
+            for span in line["spans"]:
                 _ = span.pop("chars", "no such key")
     return text_blocks
 
 
-def replace_equations_in_textblock(raw_text_blocks, inline_equation_bboxes, interline_equation_bboxes):
+def replace_equations_in_textblock(
+    raw_text_blocks, inline_equation_bboxes, interline_equation_bboxes
+):
     """
     替换行间和和行内公式为latex
     """
-    
-    raw_text_blocks = remove_text_block_in_interline_equation_bbox(interline_equation_bboxes, raw_text_blocks) # 消除重叠:第一步,在公式内部的
-    raw_text_blocks = remove_text_block_overlap_interline_equation_bbox(interline_equation_bboxes, raw_text_blocks) # 消重,第二步,和公式覆盖的
+
+    raw_text_blocks = remove_text_block_in_interline_equation_bbox(
+        interline_equation_bboxes, raw_text_blocks
+    )  # 消除重叠:第一步,在公式内部的
+    raw_text_blocks = remove_text_block_overlap_interline_equation_bbox(
+        interline_equation_bboxes, raw_text_blocks
+    )  # 消重,第二步,和公式覆盖的
     insert_interline_equations_textblock(interline_equation_bboxes, raw_text_blocks)
-    
+
     raw_text_blocks = replace_inline_equations(inline_equation_bboxes, raw_text_blocks)
-    
+
     return raw_text_blocks
-    
+
 
 def draw_block_on_pdf_with_txt_replace_eq_bbox(json_path, pdf_path):
-    """
-    """
+    """ """
     new_pdf = f"{Path(pdf_path).parent}/{Path(pdf_path).stem}.step3-消除行内公式text_block.pdf"
-    with open(json_path, "r", encoding='utf-8') as f:
+    with open(json_path, "r", encoding="utf-8") as f:
         obj = json.loads(f.read())
 
     if os.path.exists(new_pdf):
         os.remove(new_pdf)
-    new_doc = fitz.open('')
-    
+    new_doc = fitz.open("")
+
     doc = fitz.open(pdf_path)
     new_doc = fitz.open(pdf_path)
     for i in range(len(new_doc)):
         page = new_doc[i]
-        inline_equation_bboxes = obj[f"page_{i}"]['inline_equations']
-        interline_equation_bboxes = obj[f"page_{i}"]['interline_equations']
-        raw_text_blocks = obj[f'page_{i}']['preproc_blocks']
-        raw_text_blocks = remove_text_block_in_interline_equation_bbox(interline_equation_bboxes, raw_text_blocks) # 消除重叠:第一步,在公式内部的
-        raw_text_blocks = remove_text_block_overlap_interline_equation_bbox(interline_equation_bboxes, raw_text_blocks) # 消重,第二步,和公式覆盖的
+        inline_equation_bboxes = obj[f"page_{i}"]["inline_equations"]
+        interline_equation_bboxes = obj[f"page_{i}"]["interline_equations"]
+        raw_text_blocks = obj[f"page_{i}"]["preproc_blocks"]
+        raw_text_blocks = remove_text_block_in_interline_equation_bbox(
+            interline_equation_bboxes, raw_text_blocks
+        )  # 消除重叠:第一步,在公式内部的
+        raw_text_blocks = remove_text_block_overlap_interline_equation_bbox(
+            interline_equation_bboxes, raw_text_blocks
+        )  # 消重,第二步,和公式覆盖的
         insert_interline_equations_textblock(interline_equation_bboxes, raw_text_blocks)
-        raw_text_blocks = replace_inline_equations(inline_equation_bboxes, raw_text_blocks)
-        
-        
+        raw_text_blocks = replace_inline_equations(
+            inline_equation_bboxes, raw_text_blocks
+        )
+
         # 为了检验公式是否重复,把每一行里,含有公式的span背景改成黄色的
-        color_map = [fitz.pdfcolor['blue'],fitz.pdfcolor['green']]
+        color_map = [fitz.pdfcolor["blue"], fitz.pdfcolor["green"]]
         j = 0
         for blk in raw_text_blocks:
-            for i,line in enumerate(blk['lines']):
-                
+            for i, line in enumerate(blk["lines"]):
+
                 # line_box = line['bbox']
                 # shape = page.new_shape()
                 # shape.draw_rect(line_box)
                 # shape.finish(color=fitz.pdfcolor['red'], fill=color_map[j%2], fill_opacity=0.3)
                 # shape.commit()
                 # j = j+1
-                
-                for i, span in enumerate(line['spans']):
+
+                for i, span in enumerate(line["spans"]):
                     shape_page = page.new_shape()
-                    span_type = span.get('_type')
-                    color = fitz.pdfcolor['blue']
-                    if span_type=='first':
-                        color = fitz.pdfcolor['blue']
-                    elif span_type=='tail':
-                        color = fitz.pdfcolor['green']
-                    elif span_type==TYPE_INLINE_EQUATION:
-                        color = fitz.pdfcolor['black']
+                    span_type = span.get("_type")
+                    color = fitz.pdfcolor["blue"]
+                    if span_type == "first":
+                        color = fitz.pdfcolor["blue"]
+                    elif span_type == "tail":
+                        color = fitz.pdfcolor["green"]
+                    elif span_type == TYPE_INLINE_EQUATION:
+                        color = fitz.pdfcolor["black"]
                     else:
                         color = None
-                        
-                    b = span['bbox']
+
+                    b = span["bbox"]
                     shape_page.draw_rect(b)
-                    
+
                     shape_page.finish(color=None, fill=color, fill_opacity=0.3)
                     shape_page.commit()
 
     new_doc.save(new_pdf)
     logger.info(f"save ok {new_pdf}")
-    final_json = json.dumps(obj, ensure_ascii=False,indent=2)
+    final_json = json.dumps(obj, ensure_ascii=False, indent=2)
     with open("equations_test/final_json.json", "w") as f:
         f.write(final_json)
-    
+
     return new_pdf
 
 
-if __name__=="__main__":
+if __name__ == "__main__":
     # draw_block_on_pdf_with_txt_replace_eq_bbox(new_json_path, equation_color_pdf)
     pass

+ 1 - 1
magic_pdf/user_api.py

@@ -16,7 +16,7 @@ from loguru import logger
 
 from magic_pdf.rw import AbsReaderWriter
 from magic_pdf.pdf_parse_by_ocr_v2 import parse_pdf_by_ocr
-from magic_pdf.pdf_parse_by_txt import parse_pdf_by_txt
+from magic_pdf.pdf_parse_by_txt_v2 import parse_pdf_by_txt
 
 
 PARSE_TYPE_TXT = "txt"