pipeline_middle_json_mkcontent.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. import re
  2. from loguru import logger
  3. from mineru.utils.config_reader import get_latex_delimiter_config
  4. from mineru.backend.pipeline.para_split import ListLineTag
  5. from mineru.utils.enum_class import BlockType, ContentType, MakeMode
  6. from mineru.utils.language import detect_lang
  7. def __is_hyphen_at_line_end(line):
  8. """Check if a line ends with one or more letters followed by a hyphen.
  9. Args:
  10. line (str): The line of text to check.
  11. Returns:
  12. bool: True if the line ends with one or more letters followed by a hyphen, False otherwise.
  13. """
  14. # Use regex to check if the line ends with one or more letters followed by a hyphen
  15. return bool(re.search(r'[A-Za-z]+-\s*$', line))
  16. def make_blocks_to_markdown(paras_of_layout,
  17. mode,
  18. img_buket_path='',
  19. ):
  20. page_markdown = []
  21. for para_block in paras_of_layout:
  22. para_text = ''
  23. para_type = para_block['type']
  24. if para_type in [BlockType.TEXT, BlockType.LIST, BlockType.INDEX]:
  25. para_text = merge_para_with_text(para_block)
  26. elif para_type == BlockType.TITLE:
  27. title_level = get_title_level(para_block)
  28. para_text = f'{"#" * title_level} {merge_para_with_text(para_block)}'
  29. elif para_type == BlockType.INTERLINE_EQUATION:
  30. if para_block['lines'][0]['spans'][0].get('content', ''):
  31. para_text = merge_para_with_text(para_block)
  32. else:
  33. para_text += f"![]({img_buket_path}/{para_block['lines'][0]['spans'][0]['image_path']})"
  34. elif para_type == BlockType.IMAGE:
  35. if mode == MakeMode.NLP_MD:
  36. continue
  37. elif mode == MakeMode.MM_MD:
  38. # 检测是否存在图片脚注
  39. has_image_footnote = any(block['type'] == BlockType.IMAGE_FOOTNOTE for block in para_block['blocks'])
  40. # 如果存在图片脚注,则将图片脚注拼接到图片正文后面
  41. if has_image_footnote:
  42. for block in para_block['blocks']: # 1st.拼image_caption
  43. if block['type'] == BlockType.IMAGE_CAPTION:
  44. para_text += merge_para_with_text(block) + ' \n'
  45. for block in para_block['blocks']: # 2nd.拼image_body
  46. if block['type'] == BlockType.IMAGE_BODY:
  47. for line in block['lines']:
  48. for span in line['spans']:
  49. if span['type'] == ContentType.IMAGE:
  50. if span.get('image_path', ''):
  51. para_text += f"![]({img_buket_path}/{span['image_path']})"
  52. for block in para_block['blocks']: # 3rd.拼image_footnote
  53. if block['type'] == BlockType.IMAGE_FOOTNOTE:
  54. para_text += ' \n' + merge_para_with_text(block)
  55. else:
  56. for block in para_block['blocks']: # 1st.拼image_body
  57. if block['type'] == BlockType.IMAGE_BODY:
  58. for line in block['lines']:
  59. for span in line['spans']:
  60. if span['type'] == ContentType.IMAGE:
  61. if span.get('image_path', ''):
  62. para_text += f"![]({img_buket_path}/{span['image_path']})"
  63. for block in para_block['blocks']: # 2nd.拼image_caption
  64. if block['type'] == BlockType.IMAGE_CAPTION:
  65. para_text += ' \n' + merge_para_with_text(block)
  66. elif para_type == BlockType.TABLE:
  67. if mode == MakeMode.NLP_MD:
  68. continue
  69. elif mode == MakeMode.MM_MD:
  70. for block in para_block['blocks']: # 1st.拼table_caption
  71. if block['type'] == BlockType.TABLE_CAPTION:
  72. para_text += merge_para_with_text(block) + ' \n'
  73. for block in para_block['blocks']: # 2nd.拼table_body
  74. if block['type'] == BlockType.TABLE_BODY:
  75. for line in block['lines']:
  76. for span in line['spans']:
  77. if span['type'] == ContentType.TABLE:
  78. # if processed by table model
  79. if span.get('html', ''):
  80. para_text += f"\n{span['html']}\n"
  81. elif span.get('image_path', ''):
  82. para_text += f"![]({img_buket_path}/{span['image_path']})"
  83. for block in para_block['blocks']: # 3rd.拼table_footnote
  84. if block['type'] == BlockType.TABLE_FOOTNOTE:
  85. para_text += '\n' + merge_para_with_text(block) + ' '
  86. if para_text.strip() == '':
  87. continue
  88. else:
  89. # page_markdown.append(para_text.strip() + ' ')
  90. page_markdown.append(para_text.strip())
  91. return page_markdown
  92. def full_to_half(text: str) -> str:
  93. """Convert full-width characters to half-width characters using code point manipulation.
  94. Args:
  95. text: String containing full-width characters
  96. Returns:
  97. String with full-width characters converted to half-width
  98. """
  99. result = []
  100. for char in text:
  101. code = ord(char)
  102. # Full-width letters and numbers (FF21-FF3A for A-Z, FF41-FF5A for a-z, FF10-FF19 for 0-9)
  103. if (0xFF21 <= code <= 0xFF3A) or (0xFF41 <= code <= 0xFF5A) or (0xFF10 <= code <= 0xFF19):
  104. result.append(chr(code - 0xFEE0)) # Shift to ASCII range
  105. else:
  106. result.append(char)
  107. return ''.join(result)
  108. latex_delimiters_config = get_latex_delimiter_config()
  109. default_delimiters = {
  110. 'display': {'left': '$$', 'right': '$$'},
  111. 'inline': {'left': '$', 'right': '$'}
  112. }
  113. delimiters = latex_delimiters_config if latex_delimiters_config else default_delimiters
  114. display_left_delimiter = delimiters['display']['left']
  115. display_right_delimiter = delimiters['display']['right']
  116. inline_left_delimiter = delimiters['inline']['left']
  117. inline_right_delimiter = delimiters['inline']['right']
  118. def merge_para_with_text(para_block):
  119. block_text = ''
  120. for line in para_block['lines']:
  121. for span in line['spans']:
  122. if span['type'] in [ContentType.TEXT]:
  123. span['content'] = full_to_half(span['content'])
  124. block_text += span['content']
  125. block_lang = detect_lang(block_text)
  126. para_text = ''
  127. for i, line in enumerate(para_block['lines']):
  128. if i >= 1 and line.get(ListLineTag.IS_LIST_START_LINE, False):
  129. para_text += ' \n'
  130. for j, span in enumerate(line['spans']):
  131. span_type = span['type']
  132. content = ''
  133. if span_type == ContentType.TEXT:
  134. content = escape_special_markdown_char(span['content'])
  135. elif span_type == ContentType.INLINE_EQUATION:
  136. content = f"{inline_left_delimiter}{span['content']}{inline_right_delimiter}"
  137. elif span_type == ContentType.INTERLINE_EQUATION:
  138. content = f"\n{display_left_delimiter}\n{span['content']}\n{display_right_delimiter}\n"
  139. content = content.strip()
  140. if content:
  141. langs = ['zh', 'ja', 'ko']
  142. # logger.info(f'block_lang: {block_lang}, content: {content}')
  143. if block_lang in langs: # 中文/日语/韩文语境下,换行不需要空格分隔,但是如果是行内公式结尾,还是要加空格
  144. if j == len(line['spans']) - 1 and span_type not in [ContentType.INLINE_EQUATION]:
  145. para_text += content
  146. else:
  147. para_text += f'{content} '
  148. else:
  149. if span_type in [ContentType.TEXT, ContentType.INLINE_EQUATION]:
  150. # 如果span是line的最后一个且末尾带有-连字符,那么末尾不应该加空格,同时应该把-删除
  151. if j == len(line['spans'])-1 and span_type == ContentType.TEXT and __is_hyphen_at_line_end(content):
  152. para_text += content[:-1]
  153. else: # 西方文本语境下 content间需要空格分隔
  154. para_text += f'{content} '
  155. elif span_type == ContentType.INTERLINE_EQUATION:
  156. para_text += content
  157. else:
  158. continue
  159. return para_text
  160. def make_blocks_to_content_list(para_block, img_buket_path, page_idx):
  161. para_type = para_block['type']
  162. para_content = {}
  163. if para_type in [BlockType.TEXT, BlockType.LIST, BlockType.INDEX]:
  164. para_content = {
  165. 'type': 'text',
  166. 'text': merge_para_with_text(para_block),
  167. }
  168. elif para_type == BlockType.TITLE:
  169. para_content = {
  170. 'type': 'text',
  171. 'text': merge_para_with_text(para_block),
  172. }
  173. title_level = get_title_level(para_block)
  174. if title_level != 0:
  175. para_content['text_level'] = title_level
  176. elif para_type == BlockType.INTERLINE_EQUATION:
  177. para_content = {
  178. 'type': 'equation',
  179. 'img_path': f"{img_buket_path}/{para_block['lines'][0]['spans'][0].get('image_path', '')}",
  180. }
  181. if para_block['lines'][0]['spans'][0].get('content', ''):
  182. para_content['text'] = merge_para_with_text(para_block)
  183. para_content['text_format'] = 'latex'
  184. elif para_type == BlockType.IMAGE:
  185. para_content = {'type': 'image', 'img_path': '', 'img_caption': [], 'img_footnote': []}
  186. for block in para_block['blocks']:
  187. if block['type'] == BlockType.IMAGE_BODY:
  188. for line in block['lines']:
  189. for span in line['spans']:
  190. if span['type'] == ContentType.IMAGE:
  191. if span.get('image_path', ''):
  192. para_content['img_path'] = f"{img_buket_path}/{span['image_path']}"
  193. if block['type'] == BlockType.IMAGE_CAPTION:
  194. para_content['img_caption'].append(merge_para_with_text(block))
  195. if block['type'] == BlockType.IMAGE_FOOTNOTE:
  196. para_content['img_footnote'].append(merge_para_with_text(block))
  197. elif para_type == BlockType.TABLE:
  198. para_content = {'type': 'table', 'img_path': '', 'table_caption': [], 'table_footnote': []}
  199. for block in para_block['blocks']:
  200. if block['type'] == BlockType.TABLE_BODY:
  201. for line in block['lines']:
  202. for span in line['spans']:
  203. if span['type'] == ContentType.TABLE:
  204. if span.get('latex', ''):
  205. para_content['table_body'] = f"{span['latex']}"
  206. elif span.get('html', ''):
  207. para_content['table_body'] = f"{span['html']}"
  208. if span.get('image_path', ''):
  209. para_content['img_path'] = f"{img_buket_path}/{span['image_path']}"
  210. if block['type'] == BlockType.TABLE_CAPTION:
  211. para_content['table_caption'].append(merge_para_with_text(block))
  212. if block['type'] == BlockType.TABLE_FOOTNOTE:
  213. para_content['table_footnote'].append(merge_para_with_text(block))
  214. para_content['page_idx'] = page_idx
  215. return para_content
  216. def union_make(pdf_info_dict: list,
  217. make_mode: str,
  218. img_buket_path: str = '',
  219. ):
  220. output_content = []
  221. for page_info in pdf_info_dict:
  222. paras_of_layout = page_info.get('para_blocks')
  223. page_idx = page_info.get('page_idx')
  224. if not paras_of_layout:
  225. continue
  226. if make_mode in [MakeMode.MM_MD, MakeMode.NLP_MD]:
  227. page_markdown = make_blocks_to_markdown(paras_of_layout, make_mode, img_buket_path)
  228. output_content.extend(page_markdown)
  229. elif make_mode == MakeMode.STANDARD_FORMAT:
  230. for para_block in paras_of_layout:
  231. para_content = make_blocks_to_content_list(para_block, img_buket_path, page_idx)
  232. output_content.append(para_content)
  233. if make_mode in [MakeMode.MM_MD, MakeMode.NLP_MD]:
  234. return '\n\n'.join(output_content)
  235. elif make_mode == MakeMode.STANDARD_FORMAT:
  236. return output_content
  237. else:
  238. logger.error(f"Unsupported make mode: {make_mode}")
  239. return None
  240. def get_title_level(block):
  241. title_level = block.get('level', 1)
  242. if title_level > 4:
  243. title_level = 4
  244. elif title_level < 1:
  245. title_level = 0
  246. return title_level
  247. def escape_special_markdown_char(content):
  248. """
  249. 转义正文里对markdown语法有特殊意义的字符
  250. """
  251. special_chars = ["*", "`", "~", "$"]
  252. for char in special_chars:
  253. content = content.replace(char, "\\" + char)
  254. return content