equations_replace.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550
  1. """对pymupdf返回的结构里的公式进行替换,替换为模型识别的公式结果."""
  2. import json
  3. import os
  4. from pathlib import Path
  5. from loguru import logger
  6. from magic_pdf.config.ocr_content_type import ContentType
  7. from magic_pdf.libs.commons import fitz
  8. TYPE_INLINE_EQUATION = ContentType.InlineEquation
  9. TYPE_INTERLINE_EQUATION = ContentType.InterlineEquation
  10. def combine_chars_to_pymudict(block_dict, char_dict):
  11. """把block级别的pymupdf 结构里加入char结构."""
  12. # 因为block_dict 被裁剪过,因此先把他和char_dict文字块对齐,才能进行补充
  13. char_map = {tuple(item['bbox']): item for item in char_dict}
  14. for i in range(len(block_dict)): # block
  15. block = block_dict[i]
  16. key = block['bbox']
  17. char_dict_item = char_map[tuple(key)]
  18. char_dict_map = {tuple(item['bbox']): item for item in char_dict_item['lines']}
  19. for j in range(len(block['lines'])):
  20. lines = block['lines'][j]
  21. with_char_lines = char_dict_map[lines['bbox']]
  22. for k in range(len(lines['spans'])):
  23. spans = lines['spans'][k]
  24. try:
  25. chars = with_char_lines['spans'][k]['chars']
  26. except Exception:
  27. logger.error(char_dict[i]['lines'][j])
  28. spans['chars'] = chars
  29. return block_dict
  30. def calculate_overlap_area_2_minbox_area_ratio(bbox1, min_bbox):
  31. """计算box1和box2的重叠面积占最小面积的box的比例."""
  32. # Determine the coordinates of the intersection rectangle
  33. x_left = max(bbox1[0], min_bbox[0])
  34. y_top = max(bbox1[1], min_bbox[1])
  35. x_right = min(bbox1[2], min_bbox[2])
  36. y_bottom = min(bbox1[3], min_bbox[3])
  37. if x_right < x_left or y_bottom < y_top:
  38. return 0.0
  39. # The area of overlap area
  40. intersection_area = (x_right - x_left) * (y_bottom - y_top)
  41. min_box_area = (min_bbox[3] - min_bbox[1]) * (min_bbox[2] - min_bbox[0])
  42. if min_box_area == 0:
  43. return 0
  44. else:
  45. return intersection_area / min_box_area
  46. def _is_xin(bbox1, bbox2):
  47. area1 = abs(bbox1[2] - bbox1[0]) * abs(bbox1[3] - bbox1[1])
  48. area2 = abs(bbox2[2] - bbox2[0]) * abs(bbox2[3] - bbox2[1])
  49. if area1 < area2:
  50. ratio = calculate_overlap_area_2_minbox_area_ratio(bbox2, bbox1)
  51. else:
  52. ratio = calculate_overlap_area_2_minbox_area_ratio(bbox1, bbox2)
  53. return ratio > 0.6
  54. def remove_text_block_in_interline_equation_bbox(interline_bboxes, text_blocks):
  55. """消除掉整个块都在行间公式块内部的文本块."""
  56. for eq_bbox in interline_bboxes:
  57. removed_txt_blk = []
  58. for text_blk in text_blocks:
  59. text_bbox = text_blk['bbox']
  60. if (
  61. calculate_overlap_area_2_minbox_area_ratio(eq_bbox['bbox'], text_bbox)
  62. >= 0.7
  63. ):
  64. removed_txt_blk.append(text_blk)
  65. for blk in removed_txt_blk:
  66. text_blocks.remove(blk)
  67. return text_blocks
  68. def _is_in_or_part_overlap(box1, box2) -> bool:
  69. """两个bbox是否有部分重叠或者包含."""
  70. if box1 is None or box2 is None:
  71. return False
  72. x0_1, y0_1, x1_1, y1_1 = box1
  73. x0_2, y0_2, x1_2, y1_2 = box2
  74. return not (
  75. x1_1 < x0_2 # box1在box2的左边
  76. or x0_1 > x1_2 # box1在box2的右边
  77. or y1_1 < y0_2 # box1在box2的上边
  78. or y0_1 > y1_2
  79. ) # box1在box2的下边
  80. def remove_text_block_overlap_interline_equation_bbox(
  81. interline_eq_bboxes, pymu_block_list
  82. ):
  83. """消除掉行行内公式有部分重叠的文本块的内容。 同时重新计算消除重叠之后文本块的大小."""
  84. deleted_block = []
  85. for text_block in pymu_block_list:
  86. deleted_line = []
  87. for line in text_block['lines']:
  88. deleted_span = []
  89. for span in line['spans']:
  90. deleted_chars = []
  91. for char in span['chars']:
  92. if any(
  93. [
  94. (
  95. calculate_overlap_area_2_minbox_area_ratio(
  96. eq_bbox['bbox'], char['bbox']
  97. )
  98. > 0.5
  99. )
  100. for eq_bbox in interline_eq_bboxes
  101. ]
  102. ):
  103. deleted_chars.append(char)
  104. # 检查span里没有char则删除这个span
  105. for char in deleted_chars:
  106. span['chars'].remove(char)
  107. # 重新计算这个span的大小
  108. if len(span['chars']) == 0: # 删除这个span
  109. deleted_span.append(span)
  110. else:
  111. span['bbox'] = (
  112. min([b['bbox'][0] for b in span['chars']]),
  113. min([b['bbox'][1] for b in span['chars']]),
  114. max([b['bbox'][2] for b in span['chars']]),
  115. max([b['bbox'][3] for b in span['chars']]),
  116. )
  117. # 检查这个span
  118. for span in deleted_span:
  119. line['spans'].remove(span)
  120. if len(line['spans']) == 0: # 删除这个line
  121. deleted_line.append(line)
  122. else:
  123. line['bbox'] = (
  124. min([b['bbox'][0] for b in line['spans']]),
  125. min([b['bbox'][1] for b in line['spans']]),
  126. max([b['bbox'][2] for b in line['spans']]),
  127. max([b['bbox'][3] for b in line['spans']]),
  128. )
  129. # 检查这个block是否可以删除
  130. for line in deleted_line:
  131. text_block['lines'].remove(line)
  132. if len(text_block['lines']) == 0: # 删除block
  133. deleted_block.append(text_block)
  134. else:
  135. text_block['bbox'] = (
  136. min([b['bbox'][0] for b in text_block['lines']]),
  137. min([b['bbox'][1] for b in text_block['lines']]),
  138. max([b['bbox'][2] for b in text_block['lines']]),
  139. max([b['bbox'][3] for b in text_block['lines']]),
  140. )
  141. # 检查text block删除
  142. for block in deleted_block:
  143. pymu_block_list.remove(block)
  144. if len(pymu_block_list) == 0:
  145. return []
  146. return pymu_block_list
  147. def insert_interline_equations_textblock(interline_eq_bboxes, pymu_block_list):
  148. """在行间公式对应的地方插上一个伪造的block."""
  149. for eq in interline_eq_bboxes:
  150. bbox = eq['bbox']
  151. latex_content = eq['latex']
  152. text_block = {
  153. 'number': len(pymu_block_list),
  154. 'type': 0,
  155. 'bbox': bbox,
  156. 'lines': [
  157. {
  158. 'spans': [
  159. {
  160. 'size': 9.962599754333496,
  161. 'type': TYPE_INTERLINE_EQUATION,
  162. 'flags': 4,
  163. 'font': TYPE_INTERLINE_EQUATION,
  164. 'color': 0,
  165. 'ascender': 0.9409999847412109,
  166. 'descender': -0.3050000071525574,
  167. 'latex': latex_content,
  168. 'origin': [bbox[0], bbox[1]],
  169. 'bbox': bbox,
  170. }
  171. ],
  172. 'wmode': 0,
  173. 'dir': [1.0, 0.0],
  174. 'bbox': bbox,
  175. }
  176. ],
  177. }
  178. pymu_block_list.append(text_block)
  179. def x_overlap_ratio(box1, box2):
  180. a, _, c, _ = box1
  181. e, _, g, _ = box2
  182. # 计算重叠宽度
  183. overlap_x = max(min(c, g) - max(a, e), 0)
  184. # 计算box1的宽度
  185. width1 = g - e
  186. # 计算重叠比例
  187. overlap_ratio = overlap_x / width1 if width1 != 0 else 0
  188. return overlap_ratio
  189. def __is_x_dir_overlap(bbox1, bbox2):
  190. return not (bbox1[2] < bbox2[0] or bbox1[0] > bbox2[2])
  191. def __y_overlap_ratio(box1, box2):
  192. """"""
  193. _, b, _, d = box1
  194. _, f, _, h = box2
  195. # 计算重叠高度
  196. overlap_y = max(min(d, h) - max(b, f), 0)
  197. # 计算box1的高度
  198. height1 = d - b
  199. # 计算重叠比例
  200. overlap_ratio = overlap_y / height1 if height1 != 0 else 0
  201. return overlap_ratio
  202. def replace_line_v2(eqinfo, line):
  203. """扫描这一行所有的和公式框X方向重叠的char,然后计算char的左、右x0, x1,位于这个区间内的span删除掉。
  204. 最后与这个x0,x1有相交的span0, span1内部进行分割。"""
  205. first_overlap_span = -1
  206. first_overlap_span_idx = -1
  207. last_overlap_span = -1
  208. delete_chars = []
  209. for i in range(0, len(line['spans'])):
  210. if 'chars' not in line['spans'][i]:
  211. continue
  212. if line['spans'][i].get('_type', None) is not None:
  213. continue # 忽略,因为已经是插入的伪造span公式了
  214. for char in line['spans'][i]['chars']:
  215. if __is_x_dir_overlap(eqinfo['bbox'], char['bbox']):
  216. line_txt = ''
  217. for span in line['spans']:
  218. span_txt = '<span>'
  219. for ch in span['chars']:
  220. span_txt = span_txt + ch['c']
  221. span_txt = span_txt + '</span>'
  222. line_txt = line_txt + span_txt
  223. if first_overlap_span_idx == -1:
  224. first_overlap_span = line['spans'][i]
  225. first_overlap_span_idx = i
  226. last_overlap_span = line['spans'][i]
  227. delete_chars.append(char)
  228. # 第一个和最后一个char要进行检查,到底属于公式多还是属于正常span多
  229. if len(delete_chars) > 0:
  230. ch0_bbox = delete_chars[0]['bbox']
  231. if x_overlap_ratio(eqinfo['bbox'], ch0_bbox) < 0.51:
  232. delete_chars.remove(delete_chars[0])
  233. if len(delete_chars) > 0:
  234. ch0_bbox = delete_chars[-1]['bbox']
  235. if x_overlap_ratio(eqinfo['bbox'], ch0_bbox) < 0.51:
  236. delete_chars.remove(delete_chars[-1])
  237. # 计算x方向上被删除区间内的char的真实x0, x1
  238. if len(delete_chars):
  239. x0, x1 = (
  240. min([b['bbox'][0] for b in delete_chars]),
  241. max([b['bbox'][2] for b in delete_chars]),
  242. )
  243. else:
  244. # logger.debug(f"行内公式替换没有发生,尝试下一行匹配, eqinfo={eqinfo}")
  245. return False
  246. # 删除位于x0, x1这两个中间的span
  247. delete_span = []
  248. for span in line['spans']:
  249. span_box = span['bbox']
  250. if x0 <= span_box[0] and span_box[2] <= x1:
  251. delete_span.append(span)
  252. for span in delete_span:
  253. line['spans'].remove(span)
  254. equation_span = {
  255. 'size': 9.962599754333496,
  256. 'type': TYPE_INLINE_EQUATION,
  257. 'flags': 4,
  258. 'font': TYPE_INLINE_EQUATION,
  259. 'color': 0,
  260. 'ascender': 0.9409999847412109,
  261. 'descender': -0.3050000071525574,
  262. 'latex': '',
  263. 'origin': [337.1410153102337, 216.0205245153934],
  264. 'bbox': eqinfo['bbox'],
  265. }
  266. # equation_span = line['spans'][0].copy()
  267. equation_span['latex'] = eqinfo['latex']
  268. equation_span['bbox'] = [x0, equation_span['bbox'][1], x1, equation_span['bbox'][3]]
  269. equation_span['origin'] = [equation_span['bbox'][0], equation_span['bbox'][1]]
  270. equation_span['chars'] = delete_chars
  271. equation_span['type'] = TYPE_INLINE_EQUATION
  272. equation_span['_eq_bbox'] = eqinfo['bbox']
  273. line['spans'].insert(first_overlap_span_idx + 1, equation_span) # 放入公式
  274. # logger.info(f"==>text is 【{line_txt}】, equation is 【{eqinfo['latex_text']}】")
  275. # 第一个、和最后一个有overlap的span进行分割,然后插入对应的位置
  276. first_span_chars = [
  277. char
  278. for char in first_overlap_span['chars']
  279. if (char['bbox'][2] + char['bbox'][0]) / 2 < x0
  280. ]
  281. tail_span_chars = [
  282. char
  283. for char in last_overlap_span['chars']
  284. if (char['bbox'][0] + char['bbox'][2]) / 2 > x1
  285. ]
  286. if len(first_span_chars) > 0:
  287. first_overlap_span['chars'] = first_span_chars
  288. first_overlap_span['text'] = ''.join([char['c'] for char in first_span_chars])
  289. first_overlap_span['bbox'] = (
  290. first_overlap_span['bbox'][0],
  291. first_overlap_span['bbox'][1],
  292. max([chr['bbox'][2] for chr in first_span_chars]),
  293. first_overlap_span['bbox'][3],
  294. )
  295. # first_overlap_span['_type'] = "first"
  296. else:
  297. # 删掉
  298. if first_overlap_span not in delete_span:
  299. line['spans'].remove(first_overlap_span)
  300. if len(tail_span_chars) > 0:
  301. min_of_tail_span_x0 = min([chr['bbox'][0] for chr in tail_span_chars])
  302. min_of_tail_span_y0 = min([chr['bbox'][1] for chr in tail_span_chars])
  303. max_of_tail_span_x1 = max([chr['bbox'][2] for chr in tail_span_chars])
  304. max_of_tail_span_y1 = max([chr['bbox'][3] for chr in tail_span_chars])
  305. if last_overlap_span == first_overlap_span: # 这个时候应该插入一个新的
  306. tail_span_txt = ''.join([char['c'] for char in tail_span_chars]) # noqa: F841
  307. last_span_to_insert = last_overlap_span.copy()
  308. last_span_to_insert['chars'] = tail_span_chars
  309. last_span_to_insert['text'] = ''.join(
  310. [char['c'] for char in tail_span_chars]
  311. )
  312. if equation_span['bbox'][2] >= last_overlap_span['bbox'][2]:
  313. last_span_to_insert['bbox'] = (
  314. min_of_tail_span_x0,
  315. min_of_tail_span_y0,
  316. max_of_tail_span_x1,
  317. max_of_tail_span_y1,
  318. )
  319. else:
  320. last_span_to_insert['bbox'] = (
  321. min([chr['bbox'][0] for chr in tail_span_chars]),
  322. last_overlap_span['bbox'][1],
  323. last_overlap_span['bbox'][2],
  324. last_overlap_span['bbox'][3],
  325. )
  326. # 插入到公式对象之后
  327. equation_idx = line['spans'].index(equation_span)
  328. line['spans'].insert(equation_idx + 1, last_span_to_insert) # 放入公式
  329. else: # 直接修改原来的span
  330. last_overlap_span['chars'] = tail_span_chars
  331. last_overlap_span['text'] = ''.join([char['c'] for char in tail_span_chars])
  332. last_overlap_span['bbox'] = (
  333. min([chr['bbox'][0] for chr in tail_span_chars]),
  334. last_overlap_span['bbox'][1],
  335. last_overlap_span['bbox'][2],
  336. last_overlap_span['bbox'][3],
  337. )
  338. else:
  339. # 删掉
  340. if (
  341. last_overlap_span not in delete_span
  342. and last_overlap_span != first_overlap_span
  343. ):
  344. line['spans'].remove(last_overlap_span)
  345. remain_txt = ''
  346. for span in line['spans']:
  347. span_txt = '<span>'
  348. for char in span['chars']:
  349. span_txt = span_txt + char['c']
  350. span_txt = span_txt + '</span>'
  351. remain_txt = remain_txt + span_txt
  352. # logger.info(f"<== succ replace, text is 【{remain_txt}】, equation is 【{eqinfo['latex_text']}】")
  353. return True
  354. def replace_eq_blk(eqinfo, text_block):
  355. """替换行内公式."""
  356. for line in text_block['lines']:
  357. line_bbox = line['bbox']
  358. if (
  359. _is_xin(eqinfo['bbox'], line_bbox)
  360. or __y_overlap_ratio(eqinfo['bbox'], line_bbox) > 0.6
  361. ): # 定位到行, 使用y方向重合率是因为有的时候,一个行的宽度会小于公式位置宽度:行很高,公式很窄,
  362. replace_succ = replace_line_v2(eqinfo, line)
  363. if not replace_succ: # 有的时候,一个pdf的line高度从API里会计算的有问题,因此在行内span级别会替换不成功,这就需要继续重试下一行
  364. continue
  365. else:
  366. break
  367. else:
  368. return False
  369. return True
  370. def replace_inline_equations(inline_equation_bboxes, raw_text_blocks):
  371. """替换行内公式."""
  372. for eqinfo in inline_equation_bboxes:
  373. eqbox = eqinfo['bbox']
  374. for blk in raw_text_blocks:
  375. if _is_xin(eqbox, blk['bbox']):
  376. if not replace_eq_blk(eqinfo, blk):
  377. logger.warning(f'行内公式没有替换成功:{eqinfo} ')
  378. else:
  379. break
  380. return raw_text_blocks
  381. def remove_chars_in_text_blocks(text_blocks):
  382. """删除text_blocks里的char."""
  383. for blk in text_blocks:
  384. for line in blk['lines']:
  385. for span in line['spans']:
  386. _ = span.pop('chars', 'no such key')
  387. return text_blocks
  388. def replace_equations_in_textblock(
  389. raw_text_blocks, inline_equation_bboxes, interline_equation_bboxes
  390. ):
  391. """替换行间和和行内公式为latex."""
  392. raw_text_blocks = remove_text_block_in_interline_equation_bbox(
  393. interline_equation_bboxes, raw_text_blocks
  394. ) # 消除重叠:第一步,在公式内部的
  395. raw_text_blocks = remove_text_block_overlap_interline_equation_bbox(
  396. interline_equation_bboxes, raw_text_blocks
  397. ) # 消重,第二步,和公式覆盖的
  398. insert_interline_equations_textblock(interline_equation_bboxes, raw_text_blocks)
  399. raw_text_blocks = replace_inline_equations(inline_equation_bboxes, raw_text_blocks)
  400. return raw_text_blocks
  401. def draw_block_on_pdf_with_txt_replace_eq_bbox(json_path, pdf_path):
  402. """"""
  403. new_pdf = f'{Path(pdf_path).parent}/{Path(pdf_path).stem}.step3-消除行内公式text_block.pdf'
  404. with open(json_path, 'r', encoding='utf-8') as f:
  405. obj = json.loads(f.read())
  406. if os.path.exists(new_pdf):
  407. os.remove(new_pdf)
  408. new_doc = fitz.open('')
  409. doc = fitz.open(pdf_path) # noqa: F841
  410. new_doc = fitz.open(pdf_path)
  411. for i in range(len(new_doc)):
  412. page = new_doc[i]
  413. inline_equation_bboxes = obj[f'page_{i}']['inline_equations']
  414. interline_equation_bboxes = obj[f'page_{i}']['interline_equations']
  415. raw_text_blocks = obj[f'page_{i}']['preproc_blocks']
  416. raw_text_blocks = remove_text_block_in_interline_equation_bbox(
  417. interline_equation_bboxes, raw_text_blocks
  418. ) # 消除重叠:第一步,在公式内部的
  419. raw_text_blocks = remove_text_block_overlap_interline_equation_bbox(
  420. interline_equation_bboxes, raw_text_blocks
  421. ) # 消重,第二步,和公式覆盖的
  422. insert_interline_equations_textblock(interline_equation_bboxes, raw_text_blocks)
  423. raw_text_blocks = replace_inline_equations(
  424. inline_equation_bboxes, raw_text_blocks
  425. )
  426. # 为了检验公式是否重复,把每一行里,含有公式的span背景改成黄色的
  427. color_map = [fitz.pdfcolor['blue'], fitz.pdfcolor['green']] # noqa: F841
  428. j = 0 # noqa: F841
  429. for blk in raw_text_blocks:
  430. for i, line in enumerate(blk['lines']):
  431. # line_box = line['bbox']
  432. # shape = page.new_shape()
  433. # shape.draw_rect(line_box)
  434. # shape.finish(color=fitz.pdfcolor['red'], fill=color_map[j%2], fill_opacity=0.3)
  435. # shape.commit()
  436. # j = j+1
  437. for i, span in enumerate(line['spans']):
  438. shape_page = page.new_shape()
  439. span_type = span.get('_type')
  440. color = fitz.pdfcolor['blue']
  441. if span_type == 'first':
  442. color = fitz.pdfcolor['blue']
  443. elif span_type == 'tail':
  444. color = fitz.pdfcolor['green']
  445. elif span_type == TYPE_INLINE_EQUATION:
  446. color = fitz.pdfcolor['black']
  447. else:
  448. color = None
  449. b = span['bbox']
  450. shape_page.draw_rect(b)
  451. shape_page.finish(color=None, fill=color, fill_opacity=0.3)
  452. shape_page.commit()
  453. new_doc.save(new_pdf)
  454. logger.info(f'save ok {new_pdf}')
  455. final_json = json.dumps(obj, ensure_ascii=False, indent=2)
  456. with open('equations_test/final_json.json', 'w') as f:
  457. f.write(final_json)
  458. return new_pdf
  459. if __name__ == '__main__':
  460. # draw_block_on_pdf_with_txt_replace_eq_bbox(new_json_path, equation_color_pdf)
  461. pass