pipeline_middle_json_mkcontent.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. import re
  2. from loguru import logger
  3. from mineru.backend.pipeline.config_reader import get_latex_delimiter_config
  4. from mineru.backend.pipeline.para_split import ListLineTag
  5. from mineru.utils.enum_class import BlockType, ContentType, MakeMode
  6. from mineru.utils.language import detect_lang
  7. def __is_hyphen_at_line_end(line):
  8. """Check if a line ends with one or more letters followed by a hyphen.
  9. Args:
  10. line (str): The line of text to check.
  11. Returns:
  12. bool: True if the line ends with one or more letters followed by a hyphen, False otherwise.
  13. """
  14. # Use regex to check if the line ends with one or more letters followed by a hyphen
  15. return bool(re.search(r'[A-Za-z]+-\s*$', line))
  16. def ocr_mk_markdown_with_para_core_v2(paras_of_layout,
  17. mode,
  18. img_buket_path='',
  19. ):
  20. page_markdown = []
  21. for para_block in paras_of_layout:
  22. para_text = ''
  23. para_type = para_block['type']
  24. if para_type in [BlockType.TEXT, BlockType.LIST, BlockType.INDEX]:
  25. para_text = merge_para_with_text(para_block)
  26. elif para_type == BlockType.TITLE:
  27. title_level = get_title_level(para_block)
  28. para_text = f'{"#" * title_level} {merge_para_with_text(para_block)}'
  29. elif para_type == BlockType.INTERLINE_EQUATION:
  30. para_text = merge_para_with_text(para_block)
  31. elif para_type == BlockType.IMAGE:
  32. if mode == 'nlp':
  33. continue
  34. elif mode == 'mm':
  35. # 检测是否存在图片脚注
  36. has_image_footnote = any(block['type'] == BlockType.IMAGE_FOOTNOTE for block in para_block['blocks'])
  37. # 如果存在图片脚注,则将图片脚注拼接到图片正文后面
  38. if has_image_footnote:
  39. for block in para_block['blocks']: # 1st.拼image_caption
  40. if block['type'] == BlockType.IMAGE_CAPTION:
  41. para_text += merge_para_with_text(block) + ' \n'
  42. for block in para_block['blocks']: # 2nd.拼image_body
  43. if block['type'] == BlockType.IMAGE_BODY:
  44. for line in block['lines']:
  45. for span in line['spans']:
  46. if span['type'] == ContentType.IMAGE:
  47. if span.get('image_path', ''):
  48. para_text += f"![]({img_buket_path}/{span['image_path']})"
  49. for block in para_block['blocks']: # 3rd.拼image_footnote
  50. if block['type'] == BlockType.IMAGE_FOOTNOTE:
  51. para_text += ' \n' + merge_para_with_text(block)
  52. else:
  53. for block in para_block['blocks']: # 1st.拼image_body
  54. if block['type'] == BlockType.IMAGE_BODY:
  55. for line in block['lines']:
  56. for span in line['spans']:
  57. if span['type'] == ContentType.IMAGE:
  58. if span.get('image_path', ''):
  59. para_text += f"![]({img_buket_path}/{span['image_path']})"
  60. for block in para_block['blocks']: # 2nd.拼image_caption
  61. if block['type'] == BlockType.IMAGE_CAPTION:
  62. para_text += ' \n' + merge_para_with_text(block)
  63. elif para_type == BlockType.TABLE:
  64. if mode == 'nlp':
  65. continue
  66. elif mode == 'mm':
  67. for block in para_block['blocks']: # 1st.拼table_caption
  68. if block['type'] == BlockType.TABLE_CAPTION:
  69. para_text += merge_para_with_text(block) + ' \n'
  70. for block in para_block['blocks']: # 2nd.拼table_body
  71. if block['type'] == BlockType.TABLE_BODY:
  72. for line in block['lines']:
  73. for span in line['spans']:
  74. if span['type'] == ContentType.TABLE:
  75. # if processed by table model
  76. if span.get('html', ''):
  77. para_text += f"\n{span['html']}\n"
  78. elif span.get('image_path', ''):
  79. para_text += f"![]({img_buket_path}/{span['image_path']})"
  80. for block in para_block['blocks']: # 3rd.拼table_footnote
  81. if block['type'] == BlockType.TABLE_FOOTNOTE:
  82. para_text += '\n' + merge_para_with_text(block) + ' '
  83. if para_text.strip() == '':
  84. continue
  85. else:
  86. # page_markdown.append(para_text.strip() + ' ')
  87. page_markdown.append(para_text.strip())
  88. return page_markdown
  89. def full_to_half(text: str) -> str:
  90. """Convert full-width characters to half-width characters using code point manipulation.
  91. Args:
  92. text: String containing full-width characters
  93. Returns:
  94. String with full-width characters converted to half-width
  95. """
  96. result = []
  97. for char in text:
  98. code = ord(char)
  99. # Full-width letters and numbers (FF21-FF3A for A-Z, FF41-FF5A for a-z, FF10-FF19 for 0-9)
  100. if (0xFF21 <= code <= 0xFF3A) or (0xFF41 <= code <= 0xFF5A) or (0xFF10 <= code <= 0xFF19):
  101. result.append(chr(code - 0xFEE0)) # Shift to ASCII range
  102. else:
  103. result.append(char)
  104. return ''.join(result)
  105. latex_delimiters_config = get_latex_delimiter_config()
  106. default_delimiters = {
  107. 'display': {'left': '$$', 'right': '$$'},
  108. 'inline': {'left': '$', 'right': '$'}
  109. }
  110. delimiters = latex_delimiters_config if latex_delimiters_config else default_delimiters
  111. display_left_delimiter = delimiters['display']['left']
  112. display_right_delimiter = delimiters['display']['right']
  113. inline_left_delimiter = delimiters['inline']['left']
  114. inline_right_delimiter = delimiters['inline']['right']
  115. def merge_para_with_text(para_block):
  116. block_text = ''
  117. for line in para_block['lines']:
  118. for span in line['spans']:
  119. if span['type'] in [ContentType.TEXT]:
  120. span['content'] = full_to_half(span['content'])
  121. block_text += span['content']
  122. block_lang = detect_lang(block_text)
  123. para_text = ''
  124. for i, line in enumerate(para_block['lines']):
  125. if i >= 1 and line.get(ListLineTag.IS_LIST_START_LINE, False):
  126. para_text += ' \n'
  127. for j, span in enumerate(line['spans']):
  128. span_type = span['type']
  129. content = ''
  130. if span_type == ContentType.TEXT:
  131. content = ocr_escape_special_markdown_char(span['content'])
  132. elif span_type == ContentType.INLINE_EQUATION:
  133. content = f"{inline_left_delimiter}{span['content']}{inline_right_delimiter}"
  134. elif span_type == ContentType.INTERLINE_EQUATION:
  135. content = f"\n{display_left_delimiter}\n{span['content']}\n{display_right_delimiter}\n"
  136. content = content.strip()
  137. if content:
  138. langs = ['zh', 'ja', 'ko']
  139. # logger.info(f'block_lang: {block_lang}, content: {content}')
  140. if block_lang in langs: # 中文/日语/韩文语境下,换行不需要空格分隔,但是如果是行内公式结尾,还是要加空格
  141. if j == len(line['spans']) - 1 and span_type not in [ContentType.INLINE_EQUATION]:
  142. para_text += content
  143. else:
  144. para_text += f'{content} '
  145. else:
  146. if span_type in [ContentType.TEXT, ContentType.INLINE_EQUATION]:
  147. # 如果span是line的最后一个且末尾带有-连字符,那么末尾不应该加空格,同时应该把-删除
  148. if j == len(line['spans'])-1 and span_type == ContentType.TEXT and __is_hyphen_at_line_end(content):
  149. para_text += content[:-1]
  150. else: # 西方文本语境下 content间需要空格分隔
  151. para_text += f'{content} '
  152. elif span_type == ContentType.INTERLINE_EQUATION:
  153. para_text += content
  154. else:
  155. continue
  156. return para_text
  157. def para_to_standard_format_v2(para_block, img_buket_path, page_idx):
  158. para_type = para_block['type']
  159. para_content = {}
  160. if para_type in [BlockType.TEXT, BlockType.LIST, BlockType.INDEX]:
  161. para_content = {
  162. 'type': 'text',
  163. 'text': merge_para_with_text(para_block),
  164. }
  165. elif para_type == BlockType.TITLE:
  166. para_content = {
  167. 'type': 'text',
  168. 'text': merge_para_with_text(para_block),
  169. }
  170. title_level = get_title_level(para_block)
  171. if title_level != 0:
  172. para_content['text_level'] = title_level
  173. elif para_type == BlockType.INTERLINE_EQUATION:
  174. para_content = {
  175. 'type': 'equation',
  176. 'text': merge_para_with_text(para_block),
  177. 'text_format': 'latex',
  178. }
  179. elif para_type == BlockType.IMAGE:
  180. para_content = {'type': 'image', 'img_path': '', 'img_caption': [], 'img_footnote': []}
  181. for block in para_block['blocks']:
  182. if block['type'] == BlockType.IMAGE_BODY:
  183. for line in block['lines']:
  184. for span in line['spans']:
  185. if span['type'] == ContentType.IMAGE:
  186. if span.get('image_path', ''):
  187. para_content['img_path'] = f"{img_buket_path}/{span['image_path']}"
  188. if block['type'] == BlockType.IMAGE_CAPTION:
  189. para_content['img_caption'].append(merge_para_with_text(block))
  190. if block['type'] == BlockType.IMAGE_FOOTNOTE:
  191. para_content['img_footnote'].append(merge_para_with_text(block))
  192. elif para_type == BlockType.TABLE:
  193. para_content = {'type': 'table', 'img_path': '', 'table_caption': [], 'table_footnote': []}
  194. for block in para_block['blocks']:
  195. if block['type'] == BlockType.TABLE_BODY:
  196. for line in block['lines']:
  197. for span in line['spans']:
  198. if span['type'] == ContentType.TABLE:
  199. if span.get('latex', ''):
  200. para_content['table_body'] = f"{span['latex']}"
  201. elif span.get('html', ''):
  202. para_content['table_body'] = f"{span['html']}"
  203. if span.get('image_path', ''):
  204. para_content['img_path'] = f"{img_buket_path}/{span['image_path']}"
  205. if block['type'] == BlockType.TABLE_CAPTION:
  206. para_content['table_caption'].append(merge_para_with_text(block))
  207. if block['type'] == BlockType.TABLE_FOOTNOTE:
  208. para_content['table_footnote'].append(merge_para_with_text(block))
  209. para_content['page_idx'] = page_idx
  210. return para_content
  211. def union_make(pdf_info_dict: list,
  212. make_mode: str,
  213. img_buket_path: str = '',
  214. ):
  215. output_content = []
  216. for page_info in pdf_info_dict:
  217. paras_of_layout = page_info.get('para_blocks')
  218. page_idx = page_info.get('page_idx')
  219. if not paras_of_layout:
  220. continue
  221. if make_mode == MakeMode.MM_MD:
  222. page_markdown = ocr_mk_markdown_with_para_core_v2(paras_of_layout, 'mm', img_buket_path)
  223. output_content.extend(page_markdown)
  224. elif make_mode == MakeMode.NLP_MD:
  225. page_markdown = ocr_mk_markdown_with_para_core_v2(paras_of_layout, 'nlp')
  226. output_content.extend(page_markdown)
  227. elif make_mode == MakeMode.STANDARD_FORMAT:
  228. for para_block in paras_of_layout:
  229. para_content = para_to_standard_format_v2(para_block, img_buket_path, page_idx)
  230. output_content.append(para_content)
  231. if make_mode in [MakeMode.MM_MD, MakeMode.NLP_MD]:
  232. return '\n\n'.join(output_content)
  233. elif make_mode == MakeMode.STANDARD_FORMAT:
  234. return output_content
  235. else:
  236. logger.error(f"Unsupported make mode: {make_mode}")
  237. return None
  238. def get_title_level(block):
  239. title_level = block.get('level', 1)
  240. if title_level > 4:
  241. title_level = 4
  242. elif title_level < 1:
  243. title_level = 0
  244. return title_level
  245. def ocr_escape_special_markdown_char(content):
  246. """
  247. 转义正文里对markdown语法有特殊意义的字符
  248. """
  249. special_chars = ["*", "`", "~", "$"]
  250. for char in special_chars:
  251. content = content.replace(char, "\\" + char)
  252. return content