para_split_v3.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. import copy
  2. from magic_pdf.libs.Constants import LINES_DELETED, CROSS_PAGE
  3. LINE_STOP_FLAG = ('.', '!', '?', '。', '!', '?')
  4. def __process_blocks(blocks):
  5. result = []
  6. current_group = []
  7. for i in range(len(blocks)):
  8. current_block = blocks[i]
  9. # 如果当前块是 text 类型
  10. if current_block['type'] == 'text':
  11. current_block["bbox_fs"] = copy.deepcopy(current_block["bbox"])
  12. if len(current_block["lines"]) > 0:
  13. current_block['bbox_fs'] = [min([line['bbox'][0] for line in current_block['lines']]),
  14. min([line['bbox'][1] for line in current_block['lines']]),
  15. max([line['bbox'][2] for line in current_block['lines']]),
  16. max([line['bbox'][3] for line in current_block['lines']])]
  17. current_group.append(current_block)
  18. # 检查下一个块是否存在
  19. if i + 1 < len(blocks):
  20. next_block = blocks[i + 1]
  21. # 如果下一个块不是 text 类型且是 title 或 interline_equation 类型
  22. if next_block['type'] in ['title', 'interline_equation']:
  23. result.append(current_group)
  24. current_group = []
  25. # 处理最后一个 group
  26. if current_group:
  27. result.append(current_group)
  28. return result
  29. def __merge_2_blocks(block1, block2):
  30. if len(block1['lines']) > 0:
  31. first_line = block1['lines'][0]
  32. line_height = first_line['bbox'][3] - first_line['bbox'][1]
  33. if abs(block1['bbox_fs'][0] - first_line['bbox'][0]) < line_height/2:
  34. last_line = block2['lines'][-1]
  35. if len(last_line['spans']) > 0:
  36. last_span = last_line['spans'][-1]
  37. line_height = last_line['bbox'][3] - last_line['bbox'][1]
  38. if abs(block2['bbox_fs'][2] - last_line['bbox'][2]) < line_height and not last_span['content'].endswith(LINE_STOP_FLAG):
  39. if block1['page_num'] != block2['page_num']:
  40. for line in block1['lines']:
  41. for span in line['spans']:
  42. span[CROSS_PAGE] = True
  43. block2['lines'].extend(block1['lines'])
  44. block1['lines'] = []
  45. block1[LINES_DELETED] = True
  46. return block1, block2
  47. def __para_merge_page(blocks):
  48. page_text_blocks_groups = __process_blocks(blocks)
  49. for text_blocks_group in page_text_blocks_groups:
  50. if len(text_blocks_group) > 1:
  51. # 倒序遍历
  52. for i in range(len(text_blocks_group)-1, -1, -1):
  53. current_block = text_blocks_group[i]
  54. # 检查是否有前一个块
  55. if i - 1 >= 0:
  56. prev_block = text_blocks_group[i - 1]
  57. __merge_2_blocks(current_block, prev_block)
  58. else:
  59. continue
  60. def para_split(pdf_info_dict, debug_mode=False):
  61. all_blocks = []
  62. for page_num, page in pdf_info_dict.items():
  63. blocks = copy.deepcopy(page['preproc_blocks'])
  64. for block in blocks:
  65. block['page_num'] = page_num
  66. all_blocks.extend(blocks)
  67. __para_merge_page(all_blocks)
  68. for page_num, page in pdf_info_dict.items():
  69. page['para_blocks'] = []
  70. for block in all_blocks:
  71. if block['page_num'] == page_num:
  72. page['para_blocks'].append(block)
  73. if __name__ == '__main__':
  74. input_blocks = [
  75. {'type': 'text', 'content': '这是第一段'},
  76. {'type': 'text', 'content': '这是第二段'},
  77. {'type': 'title', 'content': '这是一个标题'},
  78. {'type': 'text', 'content': '这是第三段'},
  79. {'type': 'interline_equation', 'content': '这是一个公式'},
  80. {'type': 'text', 'content': '这是第四段'},
  81. {'type': 'image', 'content': '这是一张图片'},
  82. {'type': 'text', 'content': '这是第五段'},
  83. {'type': 'table', 'content': '这是一张表格'}
  84. ]
  85. # 调用函数
  86. for group_index, group in enumerate(__process_blocks(input_blocks)):
  87. print(f"Group {group_index}: {group}")