equations_replace.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555
  1. """
  2. 对pymupdf返回的结构里的公式进行替换,替换为模型识别的公式结果
  3. """
  4. from magic_pdf.libs.commons import fitz
  5. import json
  6. import os
  7. from pathlib import Path
  8. from loguru import logger
  9. from magic_pdf.libs.ocr_content_type import ContentType
  10. TYPE_INLINE_EQUATION = ContentType.InlineEquation
  11. TYPE_INTERLINE_EQUATION = ContentType.InterlineEquation
  12. def combine_chars_to_pymudict(block_dict, char_dict):
  13. """
  14. 把block级别的pymupdf 结构里加入char结构
  15. """
  16. # 因为block_dict 被裁剪过,因此先把他和char_dict文字块对齐,才能进行补充
  17. char_map = {tuple(item["bbox"]): item for item in char_dict}
  18. for i in range(len(block_dict)): # blcok
  19. block = block_dict[i]
  20. key = block["bbox"]
  21. char_dict_item = char_map[tuple(key)]
  22. char_dict_map = {tuple(item["bbox"]): item for item in char_dict_item["lines"]}
  23. for j in range(len(block["lines"])):
  24. lines = block["lines"][j]
  25. with_char_lines = char_dict_map[lines["bbox"]]
  26. for k in range(len(lines["spans"])):
  27. spans = lines["spans"][k]
  28. try:
  29. chars = with_char_lines["spans"][k]["chars"]
  30. except Exception as e:
  31. logger.error(char_dict[i]["lines"][j])
  32. spans["chars"] = chars
  33. return block_dict
  34. def calculate_overlap_area_2_minbox_area_ratio(bbox1, min_bbox):
  35. """
  36. 计算box1和box2的重叠面积占最小面积的box的比例
  37. """
  38. # Determine the coordinates of the intersection rectangle
  39. x_left = max(bbox1[0], min_bbox[0])
  40. y_top = max(bbox1[1], min_bbox[1])
  41. x_right = min(bbox1[2], min_bbox[2])
  42. y_bottom = min(bbox1[3], min_bbox[3])
  43. if x_right < x_left or y_bottom < y_top:
  44. return 0.0
  45. # The area of overlap area
  46. intersection_area = (x_right - x_left) * (y_bottom - y_top)
  47. min_box_area = (min_bbox[3] - min_bbox[1]) * (min_bbox[2] - min_bbox[0])
  48. if min_box_area == 0:
  49. return 0
  50. else:
  51. return intersection_area / min_box_area
  52. def _is_xin(bbox1, bbox2):
  53. area1 = abs(bbox1[2] - bbox1[0]) * abs(bbox1[3] - bbox1[1])
  54. area2 = abs(bbox2[2] - bbox2[0]) * abs(bbox2[3] - bbox2[1])
  55. if area1 < area2:
  56. ratio = calculate_overlap_area_2_minbox_area_ratio(bbox2, bbox1)
  57. else:
  58. ratio = calculate_overlap_area_2_minbox_area_ratio(bbox1, bbox2)
  59. return ratio > 0.6
  60. def remove_text_block_in_interline_equation_bbox(interline_bboxes, text_blocks):
  61. """消除掉整个块都在行间公式块内部的文本块"""
  62. for eq_bbox in interline_bboxes:
  63. removed_txt_blk = []
  64. for text_blk in text_blocks:
  65. text_bbox = text_blk["bbox"]
  66. if (
  67. calculate_overlap_area_2_minbox_area_ratio(eq_bbox["bbox"], text_bbox)
  68. >= 0.7
  69. ):
  70. removed_txt_blk.append(text_blk)
  71. for blk in removed_txt_blk:
  72. text_blocks.remove(blk)
  73. return text_blocks
  74. def _is_in_or_part_overlap(box1, box2) -> bool:
  75. """
  76. 两个bbox是否有部分重叠或者包含
  77. """
  78. if box1 is None or box2 is None:
  79. return False
  80. x0_1, y0_1, x1_1, y1_1 = box1
  81. x0_2, y0_2, x1_2, y1_2 = box2
  82. return not (
  83. x1_1 < x0_2 # box1在box2的左边
  84. or x0_1 > x1_2 # box1在box2的右边
  85. or y1_1 < y0_2 # box1在box2的上边
  86. or y0_1 > y1_2
  87. ) # box1在box2的下边
  88. def remove_text_block_overlap_interline_equation_bbox(
  89. interline_eq_bboxes, pymu_block_list
  90. ):
  91. """消除掉行行内公式有部分重叠的文本块的内容。
  92. 同时重新计算消除重叠之后文本块的大小"""
  93. deleted_block = []
  94. for text_block in pymu_block_list:
  95. deleted_line = []
  96. for line in text_block["lines"]:
  97. deleted_span = []
  98. for span in line["spans"]:
  99. deleted_chars = []
  100. for char in span["chars"]:
  101. if any(
  102. [
  103. _is_in_or_part_overlap(char["bbox"], eq_bbox["bbox"])
  104. for eq_bbox in interline_eq_bboxes
  105. ]
  106. ):
  107. deleted_chars.append(char)
  108. # 检查span里没有char则删除这个span
  109. for char in deleted_chars:
  110. span["chars"].remove(char)
  111. # 重新计算这个span的大小
  112. if len(span["chars"]) == 0: # 删除这个span
  113. deleted_span.append(span)
  114. else:
  115. span["bbox"] = (
  116. min([b["bbox"][0] for b in span["chars"]]),
  117. min([b["bbox"][1] for b in span["chars"]]),
  118. max([b["bbox"][2] for b in span["chars"]]),
  119. max([b["bbox"][3] for b in span["chars"]]),
  120. )
  121. # 检查这个span
  122. for span in deleted_span:
  123. line["spans"].remove(span)
  124. if len(line["spans"]) == 0: # 删除这个line
  125. deleted_line.append(line)
  126. else:
  127. line["bbox"] = (
  128. min([b["bbox"][0] for b in line["spans"]]),
  129. min([b["bbox"][1] for b in line["spans"]]),
  130. max([b["bbox"][2] for b in line["spans"]]),
  131. max([b["bbox"][3] for b in line["spans"]]),
  132. )
  133. # 检查这个block是否可以删除
  134. for line in deleted_line:
  135. text_block["lines"].remove(line)
  136. if len(text_block["lines"]) == 0: # 删除block
  137. deleted_block.append(text_block)
  138. else:
  139. text_block["bbox"] = (
  140. min([b["bbox"][0] for b in text_block["lines"]]),
  141. min([b["bbox"][1] for b in text_block["lines"]]),
  142. max([b["bbox"][2] for b in text_block["lines"]]),
  143. max([b["bbox"][3] for b in text_block["lines"]]),
  144. )
  145. # 检查text block删除
  146. for block in deleted_block:
  147. pymu_block_list.remove(block)
  148. if len(pymu_block_list) == 0:
  149. return []
  150. return pymu_block_list
  151. def insert_interline_equations_textblock(interline_eq_bboxes, pymu_block_list):
  152. """在行间公式对应的地方插上一个伪造的block"""
  153. for eq in interline_eq_bboxes:
  154. bbox = eq["bbox"]
  155. latex_content = eq["latex"]
  156. text_block = {
  157. "number": len(pymu_block_list),
  158. "type": 0,
  159. "bbox": bbox,
  160. "lines": [
  161. {
  162. "spans": [
  163. {
  164. "size": 9.962599754333496,
  165. "_type": TYPE_INTERLINE_EQUATION,
  166. "flags": 4,
  167. "font": TYPE_INTERLINE_EQUATION,
  168. "color": 0,
  169. "ascender": 0.9409999847412109,
  170. "descender": -0.3050000071525574,
  171. "text": f"\n$$\n{latex_content}\n$$\n",
  172. "origin": [bbox[0], bbox[1]],
  173. "bbox": bbox,
  174. }
  175. ],
  176. "wmode": 0,
  177. "dir": [1.0, 0.0],
  178. "bbox": bbox,
  179. }
  180. ],
  181. }
  182. pymu_block_list.append(text_block)
  183. def x_overlap_ratio(box1, box2):
  184. a, _, c, _ = box1
  185. e, _, g, _ = box2
  186. # 计算重叠宽度
  187. overlap_x = max(min(c, g) - max(a, e), 0)
  188. # 计算box1的宽度
  189. width1 = g - e
  190. # 计算重叠比例
  191. overlap_ratio = overlap_x / width1 if width1 != 0 else 0
  192. return overlap_ratio
  193. def __is_x_dir_overlap(bbox1, bbox2):
  194. return not (bbox1[2] < bbox2[0] or bbox1[0] > bbox2[2])
  195. def __y_overlap_ratio(box1, box2):
  196. """"""
  197. _, b, _, d = box1
  198. _, f, _, h = box2
  199. # 计算重叠高度
  200. overlap_y = max(min(d, h) - max(b, f), 0)
  201. # 计算box1的高度
  202. height1 = d - b
  203. # 计算重叠比例
  204. overlap_ratio = overlap_y / height1 if height1 != 0 else 0
  205. return overlap_ratio
  206. def replace_line_v2(eqinfo, line):
  207. """
  208. 扫描这一行所有的和公式框X方向重叠的char,然后计算char的左、右x0, x1,位于这个区间内的span删除掉。
  209. 最后与这个x0,x1有相交的span0, span1内部进行分割。
  210. """
  211. first_overlap_span = -1
  212. first_overlap_span_idx = -1
  213. last_overlap_span = -1
  214. delete_chars = []
  215. for i in range(0, len(line["spans"])):
  216. if line["spans"][i].get("_type", None) is not None:
  217. continue # 忽略,因为已经是插入的伪造span公式了
  218. for char in line["spans"][i]["chars"]:
  219. if __is_x_dir_overlap(eqinfo["bbox"], char["bbox"]):
  220. line_txt = ""
  221. for span in line["spans"]:
  222. span_txt = "<span>"
  223. for ch in span["chars"]:
  224. span_txt = span_txt + ch["c"]
  225. span_txt = span_txt + "</span>"
  226. line_txt = line_txt + span_txt
  227. if first_overlap_span_idx == -1:
  228. first_overlap_span = line["spans"][i]
  229. first_overlap_span_idx = i
  230. last_overlap_span = line["spans"][i]
  231. delete_chars.append(char)
  232. # 第一个和最后一个char要进行检查,到底属于公式多还是属于正常span多
  233. if len(delete_chars) > 0:
  234. ch0_bbox = delete_chars[0]["bbox"]
  235. if x_overlap_ratio(eqinfo["bbox"], ch0_bbox) < 0.51:
  236. delete_chars.remove(delete_chars[0])
  237. if len(delete_chars) > 0:
  238. ch0_bbox = delete_chars[-1]["bbox"]
  239. if x_overlap_ratio(eqinfo["bbox"], ch0_bbox) < 0.51:
  240. delete_chars.remove(delete_chars[-1])
  241. # 计算x方向上被删除区间内的char的真实x0, x1
  242. if len(delete_chars):
  243. x0, x1 = min([b["bbox"][0] for b in delete_chars]), max(
  244. [b["bbox"][2] for b in delete_chars]
  245. )
  246. else:
  247. logger.debug(f"行内公式替换没有发生,尝试下一行匹配, eqinfo={eqinfo}")
  248. return False
  249. # 删除位于x0, x1这两个中间的span
  250. delete_span = []
  251. for span in line["spans"]:
  252. span_box = span["bbox"]
  253. if x0 <= span_box[0] and span_box[2] <= x1:
  254. delete_span.append(span)
  255. for span in delete_span:
  256. line["spans"].remove(span)
  257. equation_span = {
  258. "size": 9.962599754333496,
  259. "type": TYPE_INLINE_EQUATION,
  260. "flags": 4,
  261. "font": TYPE_INLINE_EQUATION,
  262. "color": 0,
  263. "ascender": 0.9409999847412109,
  264. "descender": -0.3050000071525574,
  265. "latex": "",
  266. "origin": [337.1410153102337, 216.0205245153934],
  267. "bbox": eqinfo["bbox"]
  268. }
  269. # equation_span = line['spans'][0].copy()
  270. equation_span["latex"] = f" ${eqinfo['latex']}$ "
  271. equation_span["bbox"] = [x0, equation_span["bbox"][1], x1, equation_span["bbox"][3]]
  272. equation_span["origin"] = [equation_span["bbox"][0], equation_span["bbox"][1]]
  273. equation_span["chars"] = delete_chars
  274. equation_span["type"] = TYPE_INLINE_EQUATION
  275. equation_span["_eq_bbox"] = eqinfo["bbox"]
  276. line["spans"].insert(first_overlap_span_idx + 1, equation_span) # 放入公式
  277. # logger.info(f"==>text is 【{line_txt}】, equation is 【{eqinfo['latex_text']}】")
  278. # 第一个、和最后一个有overlap的span进行分割,然后插入对应的位置
  279. first_span_chars = [
  280. char
  281. for char in first_overlap_span["chars"]
  282. if (char["bbox"][2] + char["bbox"][0]) / 2 < x0
  283. ]
  284. tail_span_chars = [
  285. char
  286. for char in last_overlap_span["chars"]
  287. if (char["bbox"][0] + char["bbox"][2]) / 2 > x1
  288. ]
  289. if len(first_span_chars) > 0:
  290. first_overlap_span["chars"] = first_span_chars
  291. first_overlap_span["text"] = "".join([char["c"] for char in first_span_chars])
  292. first_overlap_span["bbox"] = (
  293. first_overlap_span["bbox"][0],
  294. first_overlap_span["bbox"][1],
  295. max([chr["bbox"][2] for chr in first_span_chars]),
  296. first_overlap_span["bbox"][3],
  297. )
  298. # first_overlap_span['_type'] = "first"
  299. else:
  300. # 删掉
  301. if first_overlap_span not in delete_span:
  302. line["spans"].remove(first_overlap_span)
  303. if len(tail_span_chars) > 0:
  304. min_of_tail_span_x0 = min([chr["bbox"][0] for chr in tail_span_chars])
  305. min_of_tail_span_y0 = min([chr["bbox"][1] for chr in tail_span_chars])
  306. max_of_tail_span_x1 = max([chr["bbox"][2] for chr in tail_span_chars])
  307. max_of_tail_span_y1 = max([chr["bbox"][3] for chr in tail_span_chars])
  308. if last_overlap_span == first_overlap_span: # 这个时候应该插入一个新的
  309. tail_span_txt = "".join([char["c"] for char in tail_span_chars])
  310. last_span_to_insert = last_overlap_span.copy()
  311. last_span_to_insert["chars"] = tail_span_chars
  312. last_span_to_insert["text"] = "".join(
  313. [char["c"] for char in tail_span_chars]
  314. )
  315. if equation_span["bbox"][2] >= last_overlap_span["bbox"][2]:
  316. last_span_to_insert["bbox"] = (
  317. min_of_tail_span_x0,
  318. min_of_tail_span_y0,
  319. max_of_tail_span_x1,
  320. max_of_tail_span_y1
  321. )
  322. else:
  323. last_span_to_insert["bbox"] = (
  324. min([chr["bbox"][0] for chr in tail_span_chars]),
  325. last_overlap_span["bbox"][1],
  326. last_overlap_span["bbox"][2],
  327. last_overlap_span["bbox"][3],
  328. )
  329. # 插入到公式对象之后
  330. equation_idx = line["spans"].index(equation_span)
  331. line["spans"].insert(equation_idx + 1, last_span_to_insert) # 放入公式
  332. else: # 直接修改原来的span
  333. last_overlap_span["chars"] = tail_span_chars
  334. last_overlap_span["text"] = "".join([char["c"] for char in tail_span_chars])
  335. last_overlap_span["bbox"] = (
  336. min([chr["bbox"][0] for chr in tail_span_chars]),
  337. last_overlap_span["bbox"][1],
  338. last_overlap_span["bbox"][2],
  339. last_overlap_span["bbox"][3],
  340. )
  341. else:
  342. # 删掉
  343. if (
  344. last_overlap_span not in delete_span
  345. and last_overlap_span != first_overlap_span
  346. ):
  347. line["spans"].remove(last_overlap_span)
  348. remain_txt = ""
  349. for span in line["spans"]:
  350. span_txt = "<span>"
  351. for char in span["chars"]:
  352. span_txt = span_txt + char["c"]
  353. span_txt = span_txt + "</span>"
  354. remain_txt = remain_txt + span_txt
  355. # logger.info(f"<== succ replace, text is 【{remain_txt}】, equation is 【{eqinfo['latex_text']}】")
  356. return True
  357. def replace_eq_blk(eqinfo, text_block):
  358. """替换行内公式"""
  359. for line in text_block["lines"]:
  360. line_bbox = line["bbox"]
  361. if (
  362. _is_xin(eqinfo["bbox"], line_bbox)
  363. or __y_overlap_ratio(eqinfo["bbox"], line_bbox) > 0.6
  364. ): # 定位到行, 使用y方向重合率是因为有的时候,一个行的宽度会小于公式位置宽度:行很高,公式很窄,
  365. replace_succ = replace_line_v2(eqinfo, line)
  366. if (
  367. not replace_succ
  368. ): # 有的时候,一个pdf的line高度从API里会计算的有问题,因此在行内span级别会替换不成功,这就需要继续重试下一行
  369. continue
  370. else:
  371. break
  372. else:
  373. return False
  374. return True
  375. def replace_inline_equations(inline_equation_bboxes, raw_text_blocks):
  376. """替换行内公式"""
  377. for eqinfo in inline_equation_bboxes:
  378. eqbox = eqinfo["bbox"]
  379. for blk in raw_text_blocks:
  380. if _is_xin(eqbox, blk["bbox"]):
  381. if not replace_eq_blk(eqinfo, blk):
  382. logger.error(f"行内公式没有替换成功:{eqinfo} ")
  383. else:
  384. break
  385. return raw_text_blocks
  386. def remove_chars_in_text_blocks(text_blocks):
  387. """删除text_blocks里的char"""
  388. for blk in text_blocks:
  389. for line in blk["lines"]:
  390. for span in line["spans"]:
  391. _ = span.pop("chars", "no such key")
  392. return text_blocks
  393. def replace_equations_in_textblock(
  394. raw_text_blocks, inline_equation_bboxes, interline_equation_bboxes
  395. ):
  396. """
  397. 替换行间和和行内公式为latex
  398. """
  399. raw_text_blocks = remove_text_block_in_interline_equation_bbox(
  400. interline_equation_bboxes, raw_text_blocks
  401. ) # 消除重叠:第一步,在公式内部的
  402. raw_text_blocks = remove_text_block_overlap_interline_equation_bbox(
  403. interline_equation_bboxes, raw_text_blocks
  404. ) # 消重,第二步,和公式覆盖的
  405. insert_interline_equations_textblock(interline_equation_bboxes, raw_text_blocks)
  406. raw_text_blocks = replace_inline_equations(inline_equation_bboxes, raw_text_blocks)
  407. return raw_text_blocks
  408. def draw_block_on_pdf_with_txt_replace_eq_bbox(json_path, pdf_path):
  409. """ """
  410. new_pdf = f"{Path(pdf_path).parent}/{Path(pdf_path).stem}.step3-消除行内公式text_block.pdf"
  411. with open(json_path, "r", encoding="utf-8") as f:
  412. obj = json.loads(f.read())
  413. if os.path.exists(new_pdf):
  414. os.remove(new_pdf)
  415. new_doc = fitz.open("")
  416. doc = fitz.open(pdf_path)
  417. new_doc = fitz.open(pdf_path)
  418. for i in range(len(new_doc)):
  419. page = new_doc[i]
  420. inline_equation_bboxes = obj[f"page_{i}"]["inline_equations"]
  421. interline_equation_bboxes = obj[f"page_{i}"]["interline_equations"]
  422. raw_text_blocks = obj[f"page_{i}"]["preproc_blocks"]
  423. raw_text_blocks = remove_text_block_in_interline_equation_bbox(
  424. interline_equation_bboxes, raw_text_blocks
  425. ) # 消除重叠:第一步,在公式内部的
  426. raw_text_blocks = remove_text_block_overlap_interline_equation_bbox(
  427. interline_equation_bboxes, raw_text_blocks
  428. ) # 消重,第二步,和公式覆盖的
  429. insert_interline_equations_textblock(interline_equation_bboxes, raw_text_blocks)
  430. raw_text_blocks = replace_inline_equations(
  431. inline_equation_bboxes, raw_text_blocks
  432. )
  433. # 为了检验公式是否重复,把每一行里,含有公式的span背景改成黄色的
  434. color_map = [fitz.pdfcolor["blue"], fitz.pdfcolor["green"]]
  435. j = 0
  436. for blk in raw_text_blocks:
  437. for i, line in enumerate(blk["lines"]):
  438. # line_box = line['bbox']
  439. # shape = page.new_shape()
  440. # shape.draw_rect(line_box)
  441. # shape.finish(color=fitz.pdfcolor['red'], fill=color_map[j%2], fill_opacity=0.3)
  442. # shape.commit()
  443. # j = j+1
  444. for i, span in enumerate(line["spans"]):
  445. shape_page = page.new_shape()
  446. span_type = span.get("_type")
  447. color = fitz.pdfcolor["blue"]
  448. if span_type == "first":
  449. color = fitz.pdfcolor["blue"]
  450. elif span_type == "tail":
  451. color = fitz.pdfcolor["green"]
  452. elif span_type == TYPE_INLINE_EQUATION:
  453. color = fitz.pdfcolor["black"]
  454. else:
  455. color = None
  456. b = span["bbox"]
  457. shape_page.draw_rect(b)
  458. shape_page.finish(color=None, fill=color, fill_opacity=0.3)
  459. shape_page.commit()
  460. new_doc.save(new_pdf)
  461. logger.info(f"save ok {new_pdf}")
  462. final_json = json.dumps(obj, ensure_ascii=False, indent=2)
  463. with open("equations_test/final_json.json", "w") as f:
  464. f.write(final_json)
  465. return new_pdf
  466. if __name__ == "__main__":
  467. # draw_block_on_pdf_with_txt_replace_eq_bbox(new_json_path, equation_color_pdf)
  468. pass