|
|
@@ -3,46 +3,36 @@ from typing import Literal
|
|
|
|
|
|
from loguru import logger
|
|
|
|
|
|
-from mineru.utils.enum_class import ContentType, BlockType, SplitFlag
|
|
|
-from mineru.backend.vlm.vlm_middle_json_mkcontent import merge_para_with_text
|
|
|
-from mineru.utils.format_utils import block_content_to_html
|
|
|
+from mineru.utils.boxbase import calculate_overlap_area_in_bbox1_area_ratio
|
|
|
+from mineru.utils.enum_class import ContentType, BlockType
|
|
|
from mineru.utils.magic_model_utils import reduct_overlap, tie_up_category_by_distance_v3
|
|
|
|
|
|
|
|
|
class MagicModel:
|
|
|
- def __init__(self, token: str, width, height):
|
|
|
- self.token = token
|
|
|
-
|
|
|
- # 使用正则表达式查找所有块
|
|
|
- pattern = (
|
|
|
- r"<\|box_start\|>(.*?)<\|box_end\|><\|ref_start\|>(.*?)<\|ref_end\|><\|md_start\|>(.*?)(?:<\|md_end\|>|<\|im_end\|>)"
|
|
|
- )
|
|
|
- block_infos = re.findall(pattern, token, re.DOTALL)
|
|
|
+ def __init__(self, page_blocks: list, width, height):
|
|
|
+ self.page_blocks = page_blocks
|
|
|
|
|
|
blocks = []
|
|
|
self.all_spans = []
|
|
|
# 解析每个块
|
|
|
- for index, block_info in enumerate(block_infos):
|
|
|
- block_bbox = block_info[0].strip()
|
|
|
+ for index, block_info in enumerate(page_blocks):
|
|
|
+ block_bbox = block_info["bbox"]
|
|
|
try:
|
|
|
- x1, y1, x2, y2 = map(int, block_bbox.split())
|
|
|
+ x1, y1, x2, y2 = block_bbox
|
|
|
x_1, y_1, x_2, y_2 = (
|
|
|
- int(x1 * width / 1000),
|
|
|
- int(y1 * height / 1000),
|
|
|
- int(x2 * width / 1000),
|
|
|
- int(y2 * height / 1000),
|
|
|
+ int(x1 * width),
|
|
|
+ int(y1 * height),
|
|
|
+ int(x2 * width),
|
|
|
+ int(y2 * height),
|
|
|
)
|
|
|
if x_2 < x_1:
|
|
|
x_1, x_2 = x_2, x_1
|
|
|
if y_2 < y_1:
|
|
|
y_1, y_2 = y_2, y_1
|
|
|
block_bbox = (x_1, y_1, x_2, y_2)
|
|
|
- block_type = block_info[1].strip()
|
|
|
- block_content = block_info[2].strip()
|
|
|
-
|
|
|
- # 如果bbox是0,0,999,999,且type为text,按notes增加表格处理
|
|
|
- if x1 == 0 and y1 == 0 and x2 == 999 and y2 == 999 and block_type == "text":
|
|
|
- block_content = block_content_to_html(block_content)
|
|
|
+ block_type = block_info["type"]
|
|
|
+ block_content = block_info["content"]
|
|
|
+ block_angle = block_info["angle"]
|
|
|
|
|
|
# print(f"坐标: {block_bbox}")
|
|
|
# print(f"类型: {block_type}")
|
|
|
@@ -54,6 +44,7 @@ class MagicModel:
|
|
|
continue
|
|
|
|
|
|
span_type = "unknown"
|
|
|
+
|
|
|
if block_type in [
|
|
|
"text",
|
|
|
"title",
|
|
|
@@ -61,8 +52,15 @@ class MagicModel:
|
|
|
"image_footnote",
|
|
|
"table_caption",
|
|
|
"table_footnote",
|
|
|
- "list",
|
|
|
- "index",
|
|
|
+ "code_caption",
|
|
|
+ "ref_text",
|
|
|
+ "phonetic",
|
|
|
+ "header",
|
|
|
+ "footer",
|
|
|
+ "page_number",
|
|
|
+ "aside_text",
|
|
|
+ "page_footnote",
|
|
|
+ "list"
|
|
|
]:
|
|
|
span_type = ContentType.TEXT
|
|
|
elif block_type in ["image"]:
|
|
|
@@ -71,6 +69,10 @@ class MagicModel:
|
|
|
elif block_type in ["table"]:
|
|
|
block_type = BlockType.TABLE_BODY
|
|
|
span_type = ContentType.TABLE
|
|
|
+ elif block_type in ["code", "algorithm"]:
|
|
|
+ line_type = block_type
|
|
|
+ block_type = BlockType.CODE_BODY
|
|
|
+ span_type = ContentType.TEXT
|
|
|
elif block_type in ["equation"]:
|
|
|
block_type = BlockType.INTERLINE_EQUATION
|
|
|
span_type = ContentType.INTERLINE_EQUATION
|
|
|
@@ -81,7 +83,7 @@ class MagicModel:
|
|
|
"type": span_type,
|
|
|
}
|
|
|
if span_type == ContentType.TABLE:
|
|
|
- span["html"] = block_content_to_html(block_content)
|
|
|
+ span["html"] = block_content
|
|
|
elif span_type in [ContentType.INTERLINE_EQUATION]:
|
|
|
span = {
|
|
|
"bbox": block_bbox,
|
|
|
@@ -89,7 +91,12 @@ class MagicModel:
|
|
|
"content": isolated_formula_clean(block_content),
|
|
|
}
|
|
|
else:
|
|
|
- if block_content.count("\\(") == block_content.count("\\)") and block_content.count("\\(") > 0:
|
|
|
+
|
|
|
+ if block_content:
|
|
|
+ block_content = clean_content(block_content)
|
|
|
+
|
|
|
+ if block_content and block_content.count("\\(") == block_content.count("\\)") and block_content.count("\\(") > 0:
|
|
|
+
|
|
|
# 生成包含文本和公式的span列表
|
|
|
spans = []
|
|
|
last_end = 0
|
|
|
@@ -138,16 +145,30 @@ class MagicModel:
|
|
|
|
|
|
if isinstance(span, dict) and "bbox" in span:
|
|
|
self.all_spans.append(span)
|
|
|
- line = {
|
|
|
- "bbox": block_bbox,
|
|
|
- "spans": [span],
|
|
|
- }
|
|
|
+ if block_type == BlockType.CODE_BODY:
|
|
|
+ line = {
|
|
|
+ "bbox": block_bbox,
|
|
|
+ "spans": [span],
|
|
|
+ "type": line_type
|
|
|
+ }
|
|
|
+ else:
|
|
|
+ line = {
|
|
|
+ "bbox": block_bbox,
|
|
|
+ "spans": [span],
|
|
|
+ }
|
|
|
elif isinstance(span, list):
|
|
|
self.all_spans.extend(span)
|
|
|
- line = {
|
|
|
- "bbox": block_bbox,
|
|
|
- "spans": span,
|
|
|
- }
|
|
|
+ if block_type == BlockType.CODE_BODY:
|
|
|
+ line = {
|
|
|
+ "bbox": block_bbox,
|
|
|
+ "spans": span,
|
|
|
+ "type": line_type
|
|
|
+ }
|
|
|
+ else:
|
|
|
+ line = {
|
|
|
+ "bbox": block_bbox,
|
|
|
+ "spans": span,
|
|
|
+ }
|
|
|
else:
|
|
|
raise ValueError(f"Invalid span type: {span_type}, expected dict or list, got {type(span)}")
|
|
|
|
|
|
@@ -155,6 +176,7 @@ class MagicModel:
|
|
|
{
|
|
|
"bbox": block_bbox,
|
|
|
"type": block_type,
|
|
|
+ "angle": block_angle,
|
|
|
"lines": [line],
|
|
|
"index": index,
|
|
|
}
|
|
|
@@ -165,35 +187,83 @@ class MagicModel:
|
|
|
self.interline_equation_blocks = []
|
|
|
self.text_blocks = []
|
|
|
self.title_blocks = []
|
|
|
+ self.code_blocks = []
|
|
|
+ self.discarded_blocks = []
|
|
|
+ self.ref_text_blocks = []
|
|
|
+ self.phonetic_blocks = []
|
|
|
+ self.list_blocks = []
|
|
|
for block in blocks:
|
|
|
if block["type"] in [BlockType.IMAGE_BODY, BlockType.IMAGE_CAPTION, BlockType.IMAGE_FOOTNOTE]:
|
|
|
self.image_blocks.append(block)
|
|
|
elif block["type"] in [BlockType.TABLE_BODY, BlockType.TABLE_CAPTION, BlockType.TABLE_FOOTNOTE]:
|
|
|
self.table_blocks.append(block)
|
|
|
+ elif block["type"] in [BlockType.CODE_BODY, BlockType.CODE_CAPTION]:
|
|
|
+ self.code_blocks.append(block)
|
|
|
elif block["type"] == BlockType.INTERLINE_EQUATION:
|
|
|
self.interline_equation_blocks.append(block)
|
|
|
elif block["type"] == BlockType.TEXT:
|
|
|
self.text_blocks.append(block)
|
|
|
elif block["type"] == BlockType.TITLE:
|
|
|
self.title_blocks.append(block)
|
|
|
+ elif block["type"] in [BlockType.REF_TEXT]:
|
|
|
+ self.ref_text_blocks.append(block)
|
|
|
+ elif block["type"] in [BlockType.PHONETIC]:
|
|
|
+ self.phonetic_blocks.append(block)
|
|
|
+ elif block["type"] in [BlockType.HEADER, BlockType.FOOTER, BlockType.PAGE_NUMBER, BlockType.ASIDE_TEXT, BlockType.PAGE_FOOTNOTE]:
|
|
|
+ self.discarded_blocks.append(block)
|
|
|
+ elif block["type"] == BlockType.LIST:
|
|
|
+ self.list_blocks.append(block)
|
|
|
else:
|
|
|
continue
|
|
|
|
|
|
+ self.list_blocks, self.text_blocks, self.ref_text_blocks = fix_list_blocks(self.list_blocks, self.text_blocks, self.ref_text_blocks)
|
|
|
+ self.image_blocks, not_include_image_blocks = fix_two_layer_blocks(self.image_blocks, BlockType.IMAGE)
|
|
|
+ self.table_blocks, not_include_table_blocks = fix_two_layer_blocks(self.table_blocks, BlockType.TABLE)
|
|
|
+ self.code_blocks, not_include_code_blocks = fix_two_layer_blocks(self.code_blocks, BlockType.CODE)
|
|
|
+ for code_block in self.code_blocks:
|
|
|
+ for block in code_block['blocks']:
|
|
|
+ if block['type'] == BlockType.CODE_BODY:
|
|
|
+ for line in block["lines"]:
|
|
|
+ if "type" in line:
|
|
|
+ code_block["sub_type"] = line["type"]
|
|
|
+ del line["type"]
|
|
|
+ else:
|
|
|
+ code_block["sub_type"] = "code"
|
|
|
+ for block in not_include_image_blocks + not_include_table_blocks + not_include_code_blocks:
|
|
|
+ block["type"] = BlockType.TEXT
|
|
|
+ self.text_blocks.append(block)
|
|
|
+
|
|
|
+
|
|
|
+ def get_list_blocks(self):
|
|
|
+ return self.list_blocks
|
|
|
+
|
|
|
def get_image_blocks(self):
|
|
|
- return fix_two_layer_blocks(self.image_blocks, BlockType.IMAGE)
|
|
|
+ return self.image_blocks
|
|
|
|
|
|
def get_table_blocks(self):
|
|
|
- return fix_two_layer_blocks(self.table_blocks, BlockType.TABLE)
|
|
|
+ return self.table_blocks
|
|
|
+
|
|
|
+ def get_code_blocks(self):
|
|
|
+ return self.code_blocks
|
|
|
+
|
|
|
+ def get_ref_text_blocks(self):
|
|
|
+ return self.ref_text_blocks
|
|
|
+
|
|
|
+ def get_phonetic_blocks(self):
|
|
|
+ return self.phonetic_blocks
|
|
|
|
|
|
def get_title_blocks(self):
|
|
|
- return fix_title_blocks(self.title_blocks)
|
|
|
+ return self.title_blocks
|
|
|
|
|
|
def get_text_blocks(self):
|
|
|
- return fix_text_blocks(self.text_blocks)
|
|
|
+ return self.text_blocks
|
|
|
|
|
|
def get_interline_equation_blocks(self):
|
|
|
return self.interline_equation_blocks
|
|
|
|
|
|
+ def get_discarded_blocks(self):
|
|
|
+ return self.discarded_blocks
|
|
|
+
|
|
|
def get_all_spans(self):
|
|
|
return self.all_spans
|
|
|
|
|
|
@@ -202,48 +272,23 @@ def isolated_formula_clean(txt):
|
|
|
latex = txt[:]
|
|
|
if latex.startswith("\\["): latex = latex[2:]
|
|
|
if latex.endswith("\\]"): latex = latex[:-2]
|
|
|
- latex = latex_fix(latex.strip())
|
|
|
+ latex = latex.strip()
|
|
|
return latex
|
|
|
|
|
|
|
|
|
-def latex_fix(latex):
|
|
|
- # valid pairs:
|
|
|
- # \left\{ ... \right\}
|
|
|
- # \left( ... \right)
|
|
|
- # \left| ... \right|
|
|
|
- # \left\| ... \right\|
|
|
|
- # \left[ ... \right]
|
|
|
-
|
|
|
- LEFT_COUNT_PATTERN = re.compile(r'\\left(?![a-zA-Z])')
|
|
|
- RIGHT_COUNT_PATTERN = re.compile(r'\\right(?![a-zA-Z])')
|
|
|
- left_count = len(LEFT_COUNT_PATTERN.findall(latex)) # 不匹配\lefteqn等
|
|
|
- right_count = len(RIGHT_COUNT_PATTERN.findall(latex)) # 不匹配\rightarrow
|
|
|
-
|
|
|
- if left_count != right_count:
|
|
|
- for _ in range(2):
|
|
|
- # replace valid pairs
|
|
|
- latex = re.sub(r'\\left\\\{', "{", latex) # \left\{
|
|
|
- latex = re.sub(r"\\left\|", "|", latex) # \left|
|
|
|
- latex = re.sub(r"\\left\\\|", "|", latex) # \left\|
|
|
|
- latex = re.sub(r"\\left\(", "(", latex) # \left(
|
|
|
- latex = re.sub(r"\\left\[", "[", latex) # \left[
|
|
|
-
|
|
|
- latex = re.sub(r"\\right\\\}", "}", latex) # \right\}
|
|
|
- latex = re.sub(r"\\right\|", "|", latex) # \right|
|
|
|
- latex = re.sub(r"\\right\\\|", "|", latex) # \right\|
|
|
|
- latex = re.sub(r"\\right\)", ")", latex) # \right)
|
|
|
- latex = re.sub(r"\\right\]", "]", latex) # \right]
|
|
|
- latex = re.sub(r"\\right\.", "", latex) # \right.
|
|
|
-
|
|
|
- # replace invalid pairs first
|
|
|
- latex = re.sub(r'\\left\{', "{", latex)
|
|
|
- latex = re.sub(r'\\right\}', "}", latex) # \left{ ... \right}
|
|
|
- latex = re.sub(r'\\left\\\(', "(", latex)
|
|
|
- latex = re.sub(r'\\right\\\)', ")", latex) # \left\( ... \right\)
|
|
|
- latex = re.sub(r'\\left\\\[', "[", latex)
|
|
|
- latex = re.sub(r'\\right\\\]', "]", latex) # \left\[ ... \right\]
|
|
|
+def clean_content(content):
|
|
|
+ if content and content.count("\\[") == content.count("\\]") and content.count("\\[") > 0:
|
|
|
+ # Function to handle each match
|
|
|
+ def replace_pattern(match):
|
|
|
+ # Extract content between \[ and \]
|
|
|
+ inner_content = match.group(1)
|
|
|
+ return f"[{inner_content}]"
|
|
|
|
|
|
- return latex
|
|
|
+ # Find all patterns of \[x\] and apply replacement
|
|
|
+ pattern = r'\\\[(.*?)\\\]'
|
|
|
+ content = re.sub(pattern, replace_pattern, content)
|
|
|
+
|
|
|
+ return content
|
|
|
|
|
|
|
|
|
def __tie_up_category_by_distance_v3(blocks, subject_block_type, object_block_type):
|
|
|
@@ -252,7 +297,7 @@ def __tie_up_category_by_distance_v3(blocks, subject_block_type, object_block_ty
|
|
|
return reduct_overlap(
|
|
|
list(
|
|
|
map(
|
|
|
- lambda x: {"bbox": x["bbox"], "lines": x["lines"], "index": x["index"]},
|
|
|
+ lambda x: {"bbox": x["bbox"], "lines": x["lines"], "index": x["index"], "angle":x["angle"]},
|
|
|
filter(
|
|
|
lambda x: x["type"] == subject_block_type,
|
|
|
blocks,
|
|
|
@@ -265,7 +310,7 @@ def __tie_up_category_by_distance_v3(blocks, subject_block_type, object_block_ty
|
|
|
return reduct_overlap(
|
|
|
list(
|
|
|
map(
|
|
|
- lambda x: {"bbox": x["bbox"], "lines": x["lines"], "index": x["index"]},
|
|
|
+ lambda x: {"bbox": x["bbox"], "lines": x["lines"], "index": x["index"], "angle":x["angle"]},
|
|
|
filter(
|
|
|
lambda x: x["type"] == object_block_type,
|
|
|
blocks,
|
|
|
@@ -281,7 +326,7 @@ def __tie_up_category_by_distance_v3(blocks, subject_block_type, object_block_ty
|
|
|
)
|
|
|
|
|
|
|
|
|
-def get_type_blocks(blocks, block_type: Literal["image", "table"]):
|
|
|
+def get_type_blocks(blocks, block_type: Literal["image", "table", "code"]):
|
|
|
with_captions = __tie_up_category_by_distance_v3(blocks, f"{block_type}_body", f"{block_type}_caption")
|
|
|
with_footnotes = __tie_up_category_by_distance_v3(blocks, f"{block_type}_body", f"{block_type}_footnote")
|
|
|
ret = []
|
|
|
@@ -297,9 +342,13 @@ def get_type_blocks(blocks, block_type: Literal["image", "table"]):
|
|
|
return ret
|
|
|
|
|
|
|
|
|
-def fix_two_layer_blocks(blocks, fix_type: Literal["image", "table"]):
|
|
|
+def fix_two_layer_blocks(blocks, fix_type: Literal["image", "table", "code"]):
|
|
|
need_fix_blocks = get_type_blocks(blocks, fix_type)
|
|
|
fixed_blocks = []
|
|
|
+ not_include_blocks = []
|
|
|
+ processed_indices = set()
|
|
|
+
|
|
|
+ # 处理需要组织成two_layer结构的blocks
|
|
|
for block in need_fix_blocks:
|
|
|
body = block[f"{fix_type}_body"]
|
|
|
caption_list = block[f"{fix_type}_caption_list"]
|
|
|
@@ -308,8 +357,12 @@ def fix_two_layer_blocks(blocks, fix_type: Literal["image", "table"]):
|
|
|
body["type"] = f"{fix_type}_body"
|
|
|
for caption in caption_list:
|
|
|
caption["type"] = f"{fix_type}_caption"
|
|
|
+ processed_indices.add(caption["index"])
|
|
|
for footnote in footnote_list:
|
|
|
footnote["type"] = f"{fix_type}_footnote"
|
|
|
+ processed_indices.add(footnote["index"])
|
|
|
+
|
|
|
+ processed_indices.add(body["index"])
|
|
|
|
|
|
two_layer_block = {
|
|
|
"type": fix_type,
|
|
|
@@ -323,58 +376,52 @@ def fix_two_layer_blocks(blocks, fix_type: Literal["image", "table"]):
|
|
|
|
|
|
fixed_blocks.append(two_layer_block)
|
|
|
|
|
|
- return fixed_blocks
|
|
|
-
|
|
|
-
|
|
|
-def fix_title_blocks(blocks):
|
|
|
+ # 添加未处理的blocks
|
|
|
for block in blocks:
|
|
|
- if block["type"] == BlockType.TITLE:
|
|
|
- title_content = merge_para_with_text(block)
|
|
|
- title_level = count_leading_hashes(title_content)
|
|
|
- block['level'] = title_level
|
|
|
- for line in block['lines']:
|
|
|
- for span in line['spans']:
|
|
|
- span['content'] = strip_leading_hashes(span['content'])
|
|
|
- break
|
|
|
+ if block["index"] not in processed_indices:
|
|
|
+ # 直接添加未处理的block
|
|
|
+ not_include_blocks.append(block)
|
|
|
+
|
|
|
+ return fixed_blocks, not_include_blocks
|
|
|
+
|
|
|
+
|
|
|
+def fix_list_blocks(list_blocks, text_blocks, ref_text_blocks):
|
|
|
+ for list_block in list_blocks:
|
|
|
+ list_block["blocks"] = []
|
|
|
+ if "lines" in list_block:
|
|
|
+ del list_block["lines"]
|
|
|
+
|
|
|
+ temp_text_blocks = text_blocks + ref_text_blocks
|
|
|
+ need_remove_blocks = []
|
|
|
+ for block in temp_text_blocks:
|
|
|
+ for list_block in list_blocks:
|
|
|
+ if calculate_overlap_area_in_bbox1_area_ratio(block["bbox"], list_block["bbox"]) >= 0.8:
|
|
|
+ list_block["blocks"].append(block)
|
|
|
+ need_remove_blocks.append(block)
|
|
|
break
|
|
|
- return blocks
|
|
|
-
|
|
|
-
|
|
|
-def count_leading_hashes(text):
|
|
|
- match = re.match(r'^(#+)', text)
|
|
|
- return len(match.group(1)) if match else 0
|
|
|
-
|
|
|
-
|
|
|
-def strip_leading_hashes(text):
|
|
|
- # 去除开头的#和紧随其后的空格
|
|
|
- return re.sub(r'^#+\s*', '', text)
|
|
|
-
|
|
|
-
|
|
|
-def fix_text_blocks(blocks):
|
|
|
- i = 0
|
|
|
- while i < len(blocks):
|
|
|
- block = blocks[i]
|
|
|
- last_line = block["lines"][-1]if block["lines"] else None
|
|
|
- if last_line:
|
|
|
- last_span = last_line["spans"][-1] if last_line["spans"] else None
|
|
|
- if last_span and last_span['content'].endswith('<|txt_contd|>'):
|
|
|
- last_span['content'] = last_span['content'][:-len('<|txt_contd|>')]
|
|
|
-
|
|
|
- # 查找下一个未被清空的块
|
|
|
- next_idx = i + 1
|
|
|
- while next_idx < len(blocks) and blocks[next_idx].get(SplitFlag.LINES_DELETED, False):
|
|
|
- next_idx += 1
|
|
|
-
|
|
|
- # 如果找到下一个有效块,则合并
|
|
|
- if next_idx < len(blocks):
|
|
|
- next_block = blocks[next_idx]
|
|
|
- # 将下一个块的lines扩展到当前块的lines中
|
|
|
- block["lines"].extend(next_block["lines"])
|
|
|
- # 清空下一个块的lines
|
|
|
- next_block["lines"] = []
|
|
|
- # 在下一个块中添加标志
|
|
|
- next_block[SplitFlag.LINES_DELETED] = True
|
|
|
- # 不增加i,继续检查当前块(现在已包含下一个块的内容)
|
|
|
- continue
|
|
|
- i += 1
|
|
|
- return blocks
|
|
|
+
|
|
|
+ for block in need_remove_blocks:
|
|
|
+ if block in text_blocks:
|
|
|
+ text_blocks.remove(block)
|
|
|
+ elif block in ref_text_blocks:
|
|
|
+ ref_text_blocks.remove(block)
|
|
|
+
|
|
|
+ # 移除blocks为空的list_block
|
|
|
+ list_blocks = [lb for lb in list_blocks if lb["blocks"]]
|
|
|
+
|
|
|
+ for list_block in list_blocks:
|
|
|
+ # 统计list_block["blocks"]中所有block的type,用众数作为list_block的sub_type
|
|
|
+ type_count = {}
|
|
|
+ line_content = []
|
|
|
+ for sub_block in list_block["blocks"]:
|
|
|
+ sub_block_type = sub_block["type"]
|
|
|
+ if sub_block_type not in type_count:
|
|
|
+ type_count[sub_block_type] = 0
|
|
|
+ type_count[sub_block_type] += 1
|
|
|
+
|
|
|
+ if type_count:
|
|
|
+ list_block["sub_type"] = max(type_count, key=type_count.get)
|
|
|
+ else:
|
|
|
+ list_block["sub_type"] = "unknown"
|
|
|
+
|
|
|
+ return list_blocks, text_blocks, ref_text_blocks
|