pipeline_middle_json_mkcontent.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. import re
  2. from loguru import logger
  3. from mineru.utils.config_reader import get_latex_delimiter_config
  4. from mineru.backend.pipeline.para_split import ListLineTag
  5. from mineru.utils.enum_class import BlockType, ContentType, MakeMode
  6. from mineru.utils.language import detect_lang
  7. def __is_hyphen_at_line_end(line):
  8. """Check if a line ends with one or more letters followed by a hyphen.
  9. Args:
  10. line (str): The line of text to check.
  11. Returns:
  12. bool: True if the line ends with one or more letters followed by a hyphen, False otherwise.
  13. """
  14. # Use regex to check if the line ends with one or more letters followed by a hyphen
  15. return bool(re.search(r'[A-Za-z]+-\s*$', line))
  16. def make_blocks_to_markdown(paras_of_layout,
  17. mode,
  18. img_buket_path='',
  19. ):
  20. page_markdown = []
  21. for para_block in paras_of_layout:
  22. para_text = ''
  23. para_type = para_block['type']
  24. if para_type in [BlockType.TEXT, BlockType.LIST, BlockType.INDEX]:
  25. para_text = merge_para_with_text(para_block)
  26. elif para_type == BlockType.TITLE:
  27. title_level = get_title_level(para_block)
  28. para_text = f'{"#" * title_level} {merge_para_with_text(para_block)}'
  29. elif para_type == BlockType.INTERLINE_EQUATION:
  30. if len(para_block['lines']) == 0 or len(para_block['lines'][0]['spans']) == 0:
  31. continue
  32. if para_block['lines'][0]['spans'][0].get('content', ''):
  33. para_text = merge_para_with_text(para_block)
  34. else:
  35. para_text += f"![]({img_buket_path}/{para_block['lines'][0]['spans'][0]['image_path']})"
  36. elif para_type == BlockType.IMAGE:
  37. if mode == MakeMode.NLP_MD:
  38. continue
  39. elif mode == MakeMode.MM_MD:
  40. # 检测是否存在图片脚注
  41. has_image_footnote = any(block['type'] == BlockType.IMAGE_FOOTNOTE for block in para_block['blocks'])
  42. # 如果存在图片脚注,则将图片脚注拼接到图片正文后面
  43. if has_image_footnote:
  44. for block in para_block['blocks']: # 1st.拼image_caption
  45. if block['type'] == BlockType.IMAGE_CAPTION:
  46. para_text += merge_para_with_text(block) + ' \n'
  47. for block in para_block['blocks']: # 2nd.拼image_body
  48. if block['type'] == BlockType.IMAGE_BODY:
  49. for line in block['lines']:
  50. for span in line['spans']:
  51. if span['type'] == ContentType.IMAGE:
  52. if span.get('image_path', ''):
  53. para_text += f"![]({img_buket_path}/{span['image_path']})"
  54. for block in para_block['blocks']: # 3rd.拼image_footnote
  55. if block['type'] == BlockType.IMAGE_FOOTNOTE:
  56. para_text += ' \n' + merge_para_with_text(block)
  57. else:
  58. for block in para_block['blocks']: # 1st.拼image_body
  59. if block['type'] == BlockType.IMAGE_BODY:
  60. for line in block['lines']:
  61. for span in line['spans']:
  62. if span['type'] == ContentType.IMAGE:
  63. if span.get('image_path', ''):
  64. para_text += f"![]({img_buket_path}/{span['image_path']})"
  65. for block in para_block['blocks']: # 2nd.拼image_caption
  66. if block['type'] == BlockType.IMAGE_CAPTION:
  67. para_text += ' \n' + merge_para_with_text(block)
  68. elif para_type == BlockType.TABLE:
  69. if mode == MakeMode.NLP_MD:
  70. continue
  71. elif mode == MakeMode.MM_MD:
  72. for block in para_block['blocks']: # 1st.拼table_caption
  73. if block['type'] == BlockType.TABLE_CAPTION:
  74. para_text += merge_para_with_text(block) + ' \n'
  75. for block in para_block['blocks']: # 2nd.拼table_body
  76. if block['type'] == BlockType.TABLE_BODY:
  77. for line in block['lines']:
  78. for span in line['spans']:
  79. if span['type'] == ContentType.TABLE:
  80. # if processed by table model
  81. if span.get('html', ''):
  82. para_text += f"\n{span['html']}\n"
  83. elif span.get('image_path', ''):
  84. para_text += f"![]({img_buket_path}/{span['image_path']})"
  85. for block in para_block['blocks']: # 3rd.拼table_footnote
  86. if block['type'] == BlockType.TABLE_FOOTNOTE:
  87. para_text += '\n' + merge_para_with_text(block) + ' '
  88. if para_text.strip() == '':
  89. continue
  90. else:
  91. # page_markdown.append(para_text.strip() + ' ')
  92. page_markdown.append(para_text.strip())
  93. return page_markdown
  94. def full_to_half(text: str) -> str:
  95. """Convert full-width characters to half-width characters using code point manipulation.
  96. Args:
  97. text: String containing full-width characters
  98. Returns:
  99. String with full-width characters converted to half-width
  100. """
  101. result = []
  102. for char in text:
  103. code = ord(char)
  104. # Full-width letters and numbers (FF21-FF3A for A-Z, FF41-FF5A for a-z, FF10-FF19 for 0-9)
  105. if (0xFF21 <= code <= 0xFF3A) or (0xFF41 <= code <= 0xFF5A) or (0xFF10 <= code <= 0xFF19):
  106. result.append(chr(code - 0xFEE0)) # Shift to ASCII range
  107. else:
  108. result.append(char)
  109. return ''.join(result)
  110. latex_delimiters_config = get_latex_delimiter_config()
  111. default_delimiters = {
  112. 'display': {'left': '$$', 'right': '$$'},
  113. 'inline': {'left': '$', 'right': '$'}
  114. }
  115. delimiters = latex_delimiters_config if latex_delimiters_config else default_delimiters
  116. display_left_delimiter = delimiters['display']['left']
  117. display_right_delimiter = delimiters['display']['right']
  118. inline_left_delimiter = delimiters['inline']['left']
  119. inline_right_delimiter = delimiters['inline']['right']
  120. def merge_para_with_text(para_block):
  121. block_text = ''
  122. for line in para_block['lines']:
  123. for span in line['spans']:
  124. if span['type'] in [ContentType.TEXT]:
  125. span['content'] = full_to_half(span['content'])
  126. block_text += span['content']
  127. block_lang = detect_lang(block_text)
  128. para_text = ''
  129. for i, line in enumerate(para_block['lines']):
  130. if i >= 1 and line.get(ListLineTag.IS_LIST_START_LINE, False):
  131. para_text += ' \n'
  132. for j, span in enumerate(line['spans']):
  133. span_type = span['type']
  134. content = ''
  135. if span_type == ContentType.TEXT:
  136. content = escape_special_markdown_char(span['content'])
  137. elif span_type == ContentType.INLINE_EQUATION:
  138. if span.get('content', ''):
  139. content = f"{inline_left_delimiter}{span['content']}{inline_right_delimiter}"
  140. elif span_type == ContentType.INTERLINE_EQUATION:
  141. if span.get('content', ''):
  142. content = f"\n{display_left_delimiter}\n{span['content']}\n{display_right_delimiter}\n"
  143. content = content.strip()
  144. if content:
  145. langs = ['zh', 'ja', 'ko']
  146. # logger.info(f'block_lang: {block_lang}, content: {content}')
  147. if block_lang in langs: # 中文/日语/韩文语境下,换行不需要空格分隔,但是如果是行内公式结尾,还是要加空格
  148. if j == len(line['spans']) - 1 and span_type not in [ContentType.INLINE_EQUATION]:
  149. para_text += content
  150. else:
  151. para_text += f'{content} '
  152. else:
  153. if span_type in [ContentType.TEXT, ContentType.INLINE_EQUATION]:
  154. # 如果span是line的最后一个且末尾带有-连字符,那么末尾不应该加空格,同时应该把-删除
  155. if j == len(line['spans'])-1 and span_type == ContentType.TEXT and __is_hyphen_at_line_end(content):
  156. para_text += content[:-1]
  157. else: # 西方文本语境下 content间需要空格分隔
  158. para_text += f'{content} '
  159. elif span_type == ContentType.INTERLINE_EQUATION:
  160. para_text += content
  161. else:
  162. continue
  163. return para_text
  164. def make_blocks_to_content_list(para_block, img_buket_path, page_idx, page_size):
  165. para_type = para_block['type']
  166. para_content = {}
  167. if para_type in [
  168. BlockType.TEXT,
  169. BlockType.LIST,
  170. BlockType.INDEX,
  171. ]:
  172. para_content = {
  173. 'type': ContentType.TEXT,
  174. 'text': merge_para_with_text(para_block),
  175. }
  176. elif para_type == BlockType.DISCARDED:
  177. para_content = {
  178. 'type': para_type,
  179. 'text': merge_para_with_text(para_block),
  180. }
  181. elif para_type == BlockType.TITLE:
  182. para_content = {
  183. 'type': ContentType.TEXT,
  184. 'text': merge_para_with_text(para_block),
  185. }
  186. title_level = get_title_level(para_block)
  187. if title_level != 0:
  188. para_content['text_level'] = title_level
  189. elif para_type == BlockType.INTERLINE_EQUATION:
  190. if len(para_block['lines']) == 0 or len(para_block['lines'][0]['spans']) == 0:
  191. return None
  192. para_content = {
  193. 'type': ContentType.EQUATION,
  194. 'img_path': f"{img_buket_path}/{para_block['lines'][0]['spans'][0].get('image_path', '')}",
  195. }
  196. if para_block['lines'][0]['spans'][0].get('content', ''):
  197. para_content['text'] = merge_para_with_text(para_block)
  198. para_content['text_format'] = 'latex'
  199. elif para_type == BlockType.IMAGE:
  200. para_content = {'type': ContentType.IMAGE, 'img_path': '', BlockType.IMAGE_CAPTION: [], BlockType.IMAGE_FOOTNOTE: []}
  201. for block in para_block['blocks']:
  202. if block['type'] == BlockType.IMAGE_BODY:
  203. for line in block['lines']:
  204. for span in line['spans']:
  205. if span['type'] == ContentType.IMAGE:
  206. if span.get('image_path', ''):
  207. para_content['img_path'] = f"{img_buket_path}/{span['image_path']}"
  208. if block['type'] == BlockType.IMAGE_CAPTION:
  209. para_content[BlockType.IMAGE_CAPTION].append(merge_para_with_text(block))
  210. if block['type'] == BlockType.IMAGE_FOOTNOTE:
  211. para_content[BlockType.IMAGE_FOOTNOTE].append(merge_para_with_text(block))
  212. elif para_type == BlockType.TABLE:
  213. para_content = {'type': ContentType.TABLE, 'img_path': '', BlockType.TABLE_CAPTION: [], BlockType.TABLE_FOOTNOTE: []}
  214. for block in para_block['blocks']:
  215. if block['type'] == BlockType.TABLE_BODY:
  216. for line in block['lines']:
  217. for span in line['spans']:
  218. if span['type'] == ContentType.TABLE:
  219. if span.get('html', ''):
  220. para_content[BlockType.TABLE_BODY] = f"{span['html']}"
  221. if span.get('image_path', ''):
  222. para_content['img_path'] = f"{img_buket_path}/{span['image_path']}"
  223. if block['type'] == BlockType.TABLE_CAPTION:
  224. para_content[BlockType.TABLE_CAPTION].append(merge_para_with_text(block))
  225. if block['type'] == BlockType.TABLE_FOOTNOTE:
  226. para_content[BlockType.TABLE_FOOTNOTE].append(merge_para_with_text(block))
  227. page_width, page_height = page_size
  228. para_bbox = para_block.get('bbox')
  229. if para_bbox:
  230. x0, y0, x1, y1 = para_bbox
  231. para_content['bbox'] = [
  232. int(x0 * 1000 / page_width),
  233. int(y0 * 1000 / page_height),
  234. int(x1 * 1000 / page_width),
  235. int(y1 * 1000 / page_height),
  236. ]
  237. para_content['page_idx'] = page_idx
  238. return para_content
  239. def union_make(pdf_info_dict: list,
  240. make_mode: str,
  241. img_buket_path: str = '',
  242. ):
  243. output_content = []
  244. for page_info in pdf_info_dict:
  245. paras_of_layout = page_info.get('para_blocks')
  246. paras_of_discarded = page_info.get('discarded_blocks')
  247. page_idx = page_info.get('page_idx')
  248. page_size = page_info.get('page_size')
  249. if make_mode in [MakeMode.MM_MD, MakeMode.NLP_MD]:
  250. if not paras_of_layout:
  251. continue
  252. page_markdown = make_blocks_to_markdown(paras_of_layout, make_mode, img_buket_path)
  253. output_content.extend(page_markdown)
  254. elif make_mode == MakeMode.CONTENT_LIST:
  255. if not paras_of_layout + paras_of_discarded:
  256. continue
  257. for para_block in paras_of_layout + paras_of_discarded:
  258. para_content = make_blocks_to_content_list(para_block, img_buket_path, page_idx, page_size)
  259. if para_content:
  260. output_content.append(para_content)
  261. if make_mode in [MakeMode.MM_MD, MakeMode.NLP_MD]:
  262. return '\n\n'.join(output_content)
  263. elif make_mode == MakeMode.CONTENT_LIST:
  264. return output_content
  265. else:
  266. logger.error(f"Unsupported make mode: {make_mode}")
  267. return None
  268. def get_title_level(block):
  269. title_level = block.get('level', 1)
  270. if title_level > 4:
  271. title_level = 4
  272. elif title_level < 1:
  273. title_level = 0
  274. return title_level
  275. def escape_special_markdown_char(content):
  276. """
  277. 转义正文里对markdown语法有特殊意义的字符
  278. """
  279. special_chars = ["*", "`", "~", "$"]
  280. for char in special_chars:
  281. content = content.replace(char, "\\" + char)
  282. return content