# Copyright (c) Opendatalab. All rights reserved. import copy import os import statistics import warnings from typing import List import torch from loguru import logger from mineru.utils.config_reader import get_device from mineru.utils.enum_class import BlockType, ModelPath from mineru.utils.models_download_utils import auto_download_and_get_model_root_path def sort_blocks_by_bbox(blocks, page_w, page_h, footnote_blocks): """获取所有line并计算正文line的高度""" line_height = get_line_height(blocks) """获取所有line并对line排序""" sorted_bboxes = sort_lines_by_model(blocks, page_w, page_h, line_height, footnote_blocks) """根据line的中位数算block的序列关系""" blocks = cal_block_index(blocks, sorted_bboxes) """将image和table的block还原回group形式参与后续流程""" blocks = revert_group_blocks(blocks) """重排block""" sorted_blocks = sorted(blocks, key=lambda b: b['index']) """block内重排(img和table的block内多个caption或footnote的排序)""" for block in sorted_blocks: if block['type'] in [BlockType.IMAGE, BlockType.TABLE]: block['blocks'] = sorted(block['blocks'], key=lambda b: b['index']) return sorted_blocks def get_line_height(blocks): page_line_height_list = [] for block in blocks: if block['type'] in [ BlockType.TEXT, BlockType.TITLE, BlockType.IMAGE_CAPTION, BlockType.IMAGE_FOOTNOTE, BlockType.TABLE_CAPTION, BlockType.TABLE_FOOTNOTE ]: for line in block['lines']: bbox = line['bbox'] page_line_height_list.append(int(bbox[3] - bbox[1])) if len(page_line_height_list) > 0: return statistics.median(page_line_height_list) else: return 10 def sort_lines_by_model(fix_blocks, page_w, page_h, line_height, footnote_blocks): page_line_list = [] def add_lines_to_block(b): line_bboxes = insert_lines_into_block(b['bbox'], line_height, page_w, page_h) b['lines'] = [] for line_bbox in line_bboxes: b['lines'].append({'bbox': line_bbox, 'spans': []}) page_line_list.extend(line_bboxes) for block in fix_blocks: if block['type'] in [ BlockType.TEXT, BlockType.TITLE, BlockType.IMAGE_CAPTION, BlockType.IMAGE_FOOTNOTE, BlockType.TABLE_CAPTION, BlockType.TABLE_FOOTNOTE ]: if len(block['lines']) == 0: add_lines_to_block(block) elif block['type'] in [BlockType.TITLE] and len(block['lines']) == 1 and (block['bbox'][3] - block['bbox'][1]) > line_height * 2: block['real_lines'] = copy.deepcopy(block['lines']) add_lines_to_block(block) else: for line in block['lines']: bbox = line['bbox'] page_line_list.append(bbox) elif block['type'] in [BlockType.IMAGE_BODY, BlockType.TABLE_BODY, BlockType.INTERLINE_EQUATION]: block['real_lines'] = copy.deepcopy(block['lines']) add_lines_to_block(block) for block in footnote_blocks: footnote_block = {'bbox': block[:4]} add_lines_to_block(footnote_block) if len(page_line_list) > 200: # layoutreader最高支持512line return None # 使用layoutreader排序 x_scale = 1000.0 / page_w y_scale = 1000.0 / page_h boxes = [] # logger.info(f"Scale: {x_scale}, {y_scale}, Boxes len: {len(page_line_list)}") for left, top, right, bottom in page_line_list: if left < 0: logger.warning( f'left < 0, left: {left}, right: {right}, top: {top}, bottom: {bottom}, page_w: {page_w}, page_h: {page_h}' ) # noqa: E501 left = 0 if right > page_w: logger.warning( f'right > page_w, left: {left}, right: {right}, top: {top}, bottom: {bottom}, page_w: {page_w}, page_h: {page_h}' ) # noqa: E501 right = page_w if top < 0: logger.warning( f'top < 0, left: {left}, right: {right}, top: {top}, bottom: {bottom}, page_w: {page_w}, page_h: {page_h}' ) # noqa: E501 top = 0 if bottom > page_h: logger.warning( f'bottom > page_h, left: {left}, right: {right}, top: {top}, bottom: {bottom}, page_w: {page_w}, page_h: {page_h}' ) # noqa: E501 bottom = page_h left = round(left * x_scale) top = round(top * y_scale) right = round(right * x_scale) bottom = round(bottom * y_scale) assert ( 1000 >= right >= left >= 0 and 1000 >= bottom >= top >= 0 ), f'Invalid box. right: {right}, left: {left}, bottom: {bottom}, top: {top}' # noqa: E126, E121 boxes.append([left, top, right, bottom]) model_manager = ModelSingleton() model = model_manager.get_model('layoutreader') with torch.no_grad(): orders = do_predict(boxes, model) sorted_bboxes = [page_line_list[i] for i in orders] return sorted_bboxes def insert_lines_into_block(block_bbox, line_height, page_w, page_h): # block_bbox是一个元组(x0, y0, x1, y1),其中(x0, y0)是左下角坐标,(x1, y1)是右上角坐标 x0, y0, x1, y1 = block_bbox block_height = y1 - y0 block_weight = x1 - x0 # 如果block高度小于n行正文,则直接返回block的bbox if line_height * 2 < block_height: if ( block_height > page_h * 0.25 and page_w * 0.5 > block_weight > page_w * 0.25 ): # 可能是双列结构,可以切细点 lines = int(block_height / line_height) else: # 如果block的宽度超过0.4页面宽度,则将block分成3行(是一种复杂布局,图不能切的太细) if block_weight > page_w * 0.4: lines = 3 elif block_weight > page_w * 0.25: # (可能是三列结构,也切细点) lines = int(block_height / line_height) else: # 判断长宽比 if block_height / block_weight > 1.2: # 细长的不分 return [[x0, y0, x1, y1]] else: # 不细长的还是分成两行 lines = 2 line_height = (y1 - y0) / lines # 确定从哪个y位置开始绘制线条 current_y = y0 # 用于存储线条的位置信息[(x0, y), ...] lines_positions = [] for i in range(lines): lines_positions.append([x0, current_y, x1, current_y + line_height]) current_y += line_height return lines_positions else: return [[x0, y0, x1, y1]] def model_init(model_name: str): from transformers import LayoutLMv3ForTokenClassification device_name = get_device() bf_16_support = False if device_name.startswith("cuda"): bf_16_support = torch.cuda.is_bf16_supported() elif device_name.startswith("mps"): bf_16_support = True device = torch.device(device_name) if model_name == 'layoutreader': # 检测modelscope的缓存目录是否存在 layoutreader_model_dir = os.path.join(auto_download_and_get_model_root_path(ModelPath.layout_reader), ModelPath.layout_reader) if os.path.exists(layoutreader_model_dir): model = LayoutLMv3ForTokenClassification.from_pretrained( layoutreader_model_dir ) else: logger.warning( 'local layoutreader model not exists, use online model from huggingface' ) model = LayoutLMv3ForTokenClassification.from_pretrained( 'hantian/layoutreader' ) if bf_16_support: model.to(device).eval().bfloat16() else: model.to(device).eval() else: logger.error('model name not allow') exit(1) return model class ModelSingleton: _instance = None _models = {} def __new__(cls, *args, **kwargs): if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def get_model(self, model_name: str): if model_name not in self._models: self._models[model_name] = model_init(model_name=model_name) return self._models[model_name] def do_predict(boxes: List[List[int]], model) -> List[int]: from mineru.model.reading_order.layout_reader import ( boxes2inputs, parse_logits, prepare_inputs) with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=FutureWarning, module="transformers") inputs = boxes2inputs(boxes) inputs = prepare_inputs(inputs, model) logits = model(**inputs).logits.cpu().squeeze(0) return parse_logits(logits, len(boxes)) def cal_block_index(fix_blocks, sorted_bboxes): if sorted_bboxes is not None: # 使用layoutreader排序 for block in fix_blocks: line_index_list = [] if len(block['lines']) == 0: block['index'] = sorted_bboxes.index(block['bbox']) else: for line in block['lines']: line['index'] = sorted_bboxes.index(line['bbox']) line_index_list.append(line['index']) median_value = statistics.median(line_index_list) block['index'] = median_value # 删除图表body block中的虚拟line信息, 并用real_lines信息回填 if block['type'] in [BlockType.IMAGE_BODY, BlockType.TABLE_BODY, BlockType.TITLE, BlockType.INTERLINE_EQUATION]: if 'real_lines' in block: block['virtual_lines'] = copy.deepcopy(block['lines']) block['lines'] = copy.deepcopy(block['real_lines']) del block['real_lines'] else: # 使用xycut排序 block_bboxes = [] for block in fix_blocks: # 如果block['bbox']任意值小于0,将其置为0 block['bbox'] = [max(0, x) for x in block['bbox']] block_bboxes.append(block['bbox']) # 删除图表body block中的虚拟line信息, 并用real_lines信息回填 if block['type'] in [BlockType.IMAGE_BODY, BlockType.TABLE_BODY, BlockType.TITLE, BlockType.INTERLINE_EQUATION]: if 'real_lines' in block: block['virtual_lines'] = copy.deepcopy(block['lines']) block['lines'] = copy.deepcopy(block['real_lines']) del block['real_lines'] import numpy as np from mineru.model.reading_order.xycut import recursive_xy_cut random_boxes = np.array(block_bboxes) np.random.shuffle(random_boxes) res = [] recursive_xy_cut(np.asarray(random_boxes).astype(int), np.arange(len(block_bboxes)), res) assert len(res) == len(block_bboxes) sorted_boxes = random_boxes[np.array(res)].tolist() for i, block in enumerate(fix_blocks): block['index'] = sorted_boxes.index(block['bbox']) # 生成line index sorted_blocks = sorted(fix_blocks, key=lambda b: b['index']) line_inedx = 1 for block in sorted_blocks: for line in block['lines']: line['index'] = line_inedx line_inedx += 1 return fix_blocks def revert_group_blocks(blocks): image_groups = {} table_groups = {} new_blocks = [] for block in blocks: if block['type'] in [BlockType.IMAGE_BODY, BlockType.IMAGE_CAPTION, BlockType.IMAGE_FOOTNOTE]: group_id = block['group_id'] if group_id not in image_groups: image_groups[group_id] = [] image_groups[group_id].append(block) elif block['type'] in [BlockType.TABLE_BODY, BlockType.TABLE_CAPTION, BlockType.TABLE_FOOTNOTE]: group_id = block['group_id'] if group_id not in table_groups: table_groups[group_id] = [] table_groups[group_id].append(block) else: new_blocks.append(block) for group_id, blocks in image_groups.items(): new_blocks.append(process_block_list(blocks, BlockType.IMAGE_BODY, BlockType.IMAGE)) for group_id, blocks in table_groups.items(): new_blocks.append(process_block_list(blocks, BlockType.TABLE_BODY, BlockType.TABLE)) return new_blocks def process_block_list(blocks, body_type, block_type): indices = [block['index'] for block in blocks] median_index = statistics.median(indices) body_bbox = next((block['bbox'] for block in blocks if block.get('type') == body_type), []) return { 'type': block_type, 'bbox': body_bbox, 'blocks': blocks, 'index': median_index, }