block_termination_processor.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480
  1. from magic_pdf.para.commons import *
  2. if sys.version_info[0] >= 3:
  3. sys.stdout.reconfigure(encoding="utf-8") # type: ignore
  4. class BlockTerminationProcessor:
  5. def __init__(self) -> None:
  6. pass
  7. def _is_consistent_lines(
  8. self,
  9. curr_line,
  10. prev_line,
  11. next_line,
  12. consistent_direction, # 0 for prev, 1 for next, 2 for both
  13. ):
  14. """
  15. This function checks if the line is consistent with its neighbors
  16. Parameters
  17. ----------
  18. curr_line : dict
  19. current line
  20. prev_line : dict
  21. previous line
  22. next_line : dict
  23. next line
  24. consistent_direction : int
  25. 0 for prev, 1 for next, 2 for both
  26. Returns
  27. -------
  28. bool
  29. True if the line is consistent with its neighbors, False otherwise.
  30. """
  31. curr_line_font_size = curr_line["spans"][0]["size"]
  32. curr_line_font_type = curr_line["spans"][0]["font"].lower()
  33. if consistent_direction == 0:
  34. if prev_line:
  35. prev_line_font_size = prev_line["spans"][0]["size"]
  36. prev_line_font_type = prev_line["spans"][0]["font"].lower()
  37. return curr_line_font_size == prev_line_font_size and curr_line_font_type == prev_line_font_type
  38. else:
  39. return False
  40. elif consistent_direction == 1:
  41. if next_line:
  42. next_line_font_size = next_line["spans"][0]["size"]
  43. next_line_font_type = next_line["spans"][0]["font"].lower()
  44. return curr_line_font_size == next_line_font_size and curr_line_font_type == next_line_font_type
  45. else:
  46. return False
  47. elif consistent_direction == 2:
  48. if prev_line and next_line:
  49. prev_line_font_size = prev_line["spans"][0]["size"]
  50. prev_line_font_type = prev_line["spans"][0]["font"].lower()
  51. next_line_font_size = next_line["spans"][0]["size"]
  52. next_line_font_type = next_line["spans"][0]["font"].lower()
  53. return (curr_line_font_size == prev_line_font_size and curr_line_font_type == prev_line_font_type) and (
  54. curr_line_font_size == next_line_font_size and curr_line_font_type == next_line_font_type
  55. )
  56. else:
  57. return False
  58. else:
  59. return False
  60. def _is_regular_line(self, curr_line_bbox, prev_line_bbox, next_line_bbox, avg_char_width, X0, X1, avg_line_height):
  61. """
  62. This function checks if the line is a regular line
  63. Parameters
  64. ----------
  65. curr_line_bbox : list
  66. bbox of the current line
  67. prev_line_bbox : list
  68. bbox of the previous line
  69. next_line_bbox : list
  70. bbox of the next line
  71. avg_char_width : float
  72. average of char widths
  73. X0 : float
  74. median of x0 values, which represents the left average boundary of the page
  75. X1 : float
  76. median of x1 values, which represents the right average boundary of the page
  77. avg_line_height : float
  78. average of line heights
  79. Returns
  80. -------
  81. bool
  82. True if the line is a regular line, False otherwise.
  83. """
  84. horizontal_ratio = 0.5
  85. vertical_ratio = 0.5
  86. horizontal_thres = horizontal_ratio * avg_char_width
  87. vertical_thres = vertical_ratio * avg_line_height
  88. x0, y0, x1, y1 = curr_line_bbox
  89. x0_near_X0 = abs(x0 - X0) < horizontal_thres
  90. x1_near_X1 = abs(x1 - X1) < horizontal_thres
  91. prev_line_is_end_of_para = prev_line_bbox and (abs(prev_line_bbox[2] - X1) > avg_char_width)
  92. sufficient_spacing_above = False
  93. if prev_line_bbox:
  94. vertical_spacing_above = y1 - prev_line_bbox[3]
  95. sufficient_spacing_above = vertical_spacing_above > vertical_thres
  96. sufficient_spacing_below = False
  97. if next_line_bbox:
  98. vertical_spacing_below = next_line_bbox[1] - y0
  99. sufficient_spacing_below = vertical_spacing_below > vertical_thres
  100. return (
  101. (sufficient_spacing_above or sufficient_spacing_below)
  102. or (not x0_near_X0 and not x1_near_X1)
  103. or prev_line_is_end_of_para
  104. )
  105. def _is_possible_start_of_para(self, curr_line, prev_line, next_line, X0, X1, avg_char_width, avg_font_size):
  106. """
  107. This function checks if the line is a possible start of a paragraph
  108. Parameters
  109. ----------
  110. curr_line : dict
  111. current line
  112. prev_line : dict
  113. previous line
  114. next_line : dict
  115. next line
  116. X0 : float
  117. median of x0 values, which represents the left average boundary of the page
  118. X1 : float
  119. median of x1 values, which represents the right average boundary of the page
  120. avg_char_width : float
  121. average of char widths
  122. avg_line_height : float
  123. average of line heights
  124. Returns
  125. -------
  126. bool
  127. True if the line is a possible start of a paragraph, False otherwise.
  128. """
  129. start_confidence = 0.5 # Initial confidence of the line being a start of a paragraph
  130. decision_path = [] # Record the decision path
  131. curr_line_bbox = curr_line["bbox"]
  132. prev_line_bbox = prev_line["bbox"] if prev_line else None
  133. next_line_bbox = next_line["bbox"] if next_line else None
  134. indent_ratio = 1
  135. vertical_ratio = 1.5
  136. vertical_thres = vertical_ratio * avg_font_size
  137. left_horizontal_ratio = 0.5
  138. left_horizontal_thres = left_horizontal_ratio * avg_char_width
  139. right_horizontal_ratio = 2.5
  140. right_horizontal_thres = right_horizontal_ratio * avg_char_width
  141. x0, y0, x1, y1 = curr_line_bbox
  142. indent_condition = x0 > X0 + indent_ratio * avg_char_width
  143. if indent_condition:
  144. start_confidence += 0.2
  145. decision_path.append("indent_condition_met")
  146. x0_near_X0 = abs(x0 - X0) < left_horizontal_thres
  147. if x0_near_X0:
  148. start_confidence += 0.1
  149. decision_path.append("x0_near_X0")
  150. x1_near_X1 = abs(x1 - X1) < right_horizontal_thres
  151. if x1_near_X1:
  152. start_confidence += 0.1
  153. decision_path.append("x1_near_X1")
  154. if prev_line is None:
  155. prev_line_is_end_of_para = True
  156. start_confidence += 0.2
  157. decision_path.append("no_prev_line")
  158. else:
  159. prev_line_is_end_of_para, _, _ = self._is_possible_end_of_para(prev_line, next_line, X0, X1, avg_char_width)
  160. if prev_line_is_end_of_para:
  161. start_confidence += 0.1
  162. decision_path.append("prev_line_is_end_of_para")
  163. sufficient_spacing_above = False
  164. if prev_line_bbox:
  165. vertical_spacing_above = y1 - prev_line_bbox[3]
  166. sufficient_spacing_above = vertical_spacing_above > vertical_thres
  167. if sufficient_spacing_above:
  168. start_confidence += 0.2
  169. decision_path.append("sufficient_spacing_above")
  170. sufficient_spacing_below = False
  171. if next_line_bbox:
  172. vertical_spacing_below = next_line_bbox[1] - y0
  173. sufficient_spacing_below = vertical_spacing_below > vertical_thres
  174. if sufficient_spacing_below:
  175. start_confidence += 0.2
  176. decision_path.append("sufficient_spacing_below")
  177. is_regular_line = self._is_regular_line(
  178. curr_line_bbox, prev_line_bbox, next_line_bbox, avg_char_width, X0, X1, avg_font_size
  179. )
  180. if is_regular_line:
  181. start_confidence += 0.1
  182. decision_path.append("is_regular_line")
  183. is_start_of_para = (
  184. (sufficient_spacing_above or sufficient_spacing_below)
  185. or (indent_condition)
  186. or (not indent_condition and x0_near_X0 and x1_near_X1 and not is_regular_line)
  187. or prev_line_is_end_of_para
  188. )
  189. return (is_start_of_para, start_confidence, decision_path)
  190. def _is_possible_end_of_para(self, curr_line, next_line, X0, X1, avg_char_width):
  191. """
  192. This function checks if the line is a possible end of a paragraph
  193. Parameters
  194. ----------
  195. curr_line : dict
  196. current line
  197. next_line : dict
  198. next line
  199. X0 : float
  200. median of x0 values, which represents the left average boundary of the page
  201. X1 : float
  202. median of x1 values, which represents the right average boundary of the page
  203. avg_char_width : float
  204. average of char widths
  205. Returns
  206. -------
  207. bool
  208. True if the line is a possible end of a paragraph, False otherwise.
  209. """
  210. end_confidence = 0.5 # Initial confidence of the line being a end of a paragraph
  211. decision_path = [] # Record the decision path
  212. curr_line_bbox = curr_line["bbox"]
  213. next_line_bbox = next_line["bbox"] if next_line else None
  214. left_horizontal_ratio = 0.5
  215. right_horizontal_ratio = 0.5
  216. x0, _, x1, y1 = curr_line_bbox
  217. next_x0, next_y0, _, _ = next_line_bbox if next_line_bbox else (0, 0, 0, 0)
  218. x0_near_X0 = abs(x0 - X0) < left_horizontal_ratio * avg_char_width
  219. if x0_near_X0:
  220. end_confidence += 0.1
  221. decision_path.append("x0_near_X0")
  222. x1_smaller_than_X1 = x1 < X1 - right_horizontal_ratio * avg_char_width
  223. if x1_smaller_than_X1:
  224. end_confidence += 0.1
  225. decision_path.append("x1_smaller_than_X1")
  226. next_line_is_start_of_para = (
  227. next_line_bbox
  228. and (next_x0 > X0 + left_horizontal_ratio * avg_char_width)
  229. and (not is_line_left_aligned_from_neighbors(curr_line_bbox, None, next_line_bbox, avg_char_width, direction=1))
  230. )
  231. if next_line_is_start_of_para:
  232. end_confidence += 0.2
  233. decision_path.append("next_line_is_start_of_para")
  234. is_line_left_aligned_from_neighbors_bool = is_line_left_aligned_from_neighbors(
  235. curr_line_bbox, None, next_line_bbox, avg_char_width
  236. )
  237. if is_line_left_aligned_from_neighbors_bool:
  238. end_confidence += 0.1
  239. decision_path.append("line_is_left_aligned_from_neighbors")
  240. is_line_right_aligned_from_neighbors_bool = is_line_right_aligned_from_neighbors(
  241. curr_line_bbox, None, next_line_bbox, avg_char_width
  242. )
  243. if not is_line_right_aligned_from_neighbors_bool:
  244. end_confidence += 0.1
  245. decision_path.append("line_is_not_right_aligned_from_neighbors")
  246. is_end_of_para = end_with_punctuation(curr_line["text"]) and (
  247. (x0_near_X0 and x1_smaller_than_X1)
  248. or (is_line_left_aligned_from_neighbors_bool and not is_line_right_aligned_from_neighbors_bool)
  249. )
  250. return (is_end_of_para, end_confidence, decision_path)
  251. def _cut_paras_per_block(
  252. self,
  253. block,
  254. ):
  255. """
  256. Processes a raw block from PyMuPDF and returns the processed block.
  257. Parameters
  258. ----------
  259. raw_block : dict
  260. A raw block from pymupdf.
  261. Returns
  262. -------
  263. processed_block : dict
  264. """
  265. def _construct_para(lines, is_block_title, para_title_level):
  266. """
  267. Construct a paragraph from given lines.
  268. """
  269. font_sizes = [span["size"] for line in lines for span in line["spans"]]
  270. avg_font_size = sum(font_sizes) / len(font_sizes) if font_sizes else 0
  271. font_colors = [span["color"] for line in lines for span in line["spans"]]
  272. most_common_font_color = max(set(font_colors), key=font_colors.count) if font_colors else None
  273. # font_types = [span["font"] for line in lines for span in line["spans"]]
  274. # most_common_font_type = max(set(font_types), key=font_types.count) if font_types else None
  275. font_type_lengths = {}
  276. for line in lines:
  277. for span in line["spans"]:
  278. font_type = span["font"]
  279. bbox_width = span["bbox"][2] - span["bbox"][0]
  280. if font_type in font_type_lengths:
  281. font_type_lengths[font_type] += bbox_width
  282. else:
  283. font_type_lengths[font_type] = bbox_width
  284. # get the font type with the longest bbox width
  285. most_common_font_type = max(font_type_lengths, key=font_type_lengths.get) if font_type_lengths else None # type: ignore
  286. para_bbox = calculate_para_bbox(lines)
  287. para_text = " ".join(line["text"] for line in lines)
  288. return {
  289. "para_bbox": para_bbox,
  290. "para_text": para_text,
  291. "para_font_type": most_common_font_type,
  292. "para_font_size": avg_font_size,
  293. "para_font_color": most_common_font_color,
  294. "is_para_title": is_block_title,
  295. "para_title_level": para_title_level,
  296. }
  297. block_bbox = block["bbox"]
  298. block_text = block["text"]
  299. block_lines = block["lines"]
  300. X0 = safe_get(block, "X0", 0)
  301. X1 = safe_get(block, "X1", 0)
  302. avg_char_width = safe_get(block, "avg_char_width", 0)
  303. avg_char_height = safe_get(block, "avg_char_height", 0)
  304. avg_font_size = safe_get(block, "avg_font_size", 0)
  305. is_block_title = safe_get(block, "is_block_title", False)
  306. para_title_level = safe_get(block, "block_title_level", 0)
  307. # Segment into paragraphs
  308. para_ranges = []
  309. in_paragraph = False
  310. start_idx_of_para = None
  311. # Create the processed paragraphs
  312. processed_paras = {}
  313. para_bboxes = []
  314. end_idx_of_para = 0
  315. for line_index, line in enumerate(block_lines):
  316. curr_line = line
  317. prev_line = block_lines[line_index - 1] if line_index > 0 else None
  318. next_line = block_lines[line_index + 1] if line_index < len(block_lines) - 1 else None
  319. """
  320. Start processing paragraphs.
  321. """
  322. # Check if the line is the start of a paragraph
  323. is_start_of_para, start_confidence, decision_path = self._is_possible_start_of_para(
  324. curr_line, prev_line, next_line, X0, X1, avg_char_width, avg_font_size
  325. )
  326. if not in_paragraph and is_start_of_para:
  327. in_paragraph = True
  328. start_idx_of_para = line_index
  329. # print_green(">>> Start of a paragraph")
  330. # print(" curr_line_text: ", curr_line["text"])
  331. # print(" start_confidence: ", start_confidence)
  332. # print(" decision_path: ", decision_path)
  333. # Check if the line is the end of a paragraph
  334. is_end_of_para, end_confidence, decision_path = self._is_possible_end_of_para(
  335. curr_line, next_line, X0, X1, avg_char_width
  336. )
  337. if in_paragraph and (is_end_of_para or not next_line):
  338. para_ranges.append((start_idx_of_para, line_index))
  339. start_idx_of_para = None
  340. in_paragraph = False
  341. # print_red(">>> End of a paragraph")
  342. # print(" curr_line_text: ", curr_line["text"])
  343. # print(" end_confidence: ", end_confidence)
  344. # print(" decision_path: ", decision_path)
  345. # Add the last paragraph if it is not added
  346. if in_paragraph and start_idx_of_para is not None:
  347. para_ranges.append((start_idx_of_para, len(block_lines) - 1))
  348. # Process the matched paragraphs
  349. for para_index, (start_idx, end_idx) in enumerate(para_ranges):
  350. matched_lines = block_lines[start_idx : end_idx + 1]
  351. para_properties = _construct_para(matched_lines, is_block_title, para_title_level)
  352. para_key = f"para_{len(processed_paras)}"
  353. processed_paras[para_key] = para_properties
  354. para_bboxes.append(para_properties["para_bbox"])
  355. end_idx_of_para = end_idx + 1
  356. # Deal with the remaining lines
  357. if end_idx_of_para < len(block_lines):
  358. unmatched_lines = block_lines[end_idx_of_para:]
  359. unmatched_properties = _construct_para(unmatched_lines, is_block_title, para_title_level)
  360. unmatched_key = f"para_{len(processed_paras)}"
  361. processed_paras[unmatched_key] = unmatched_properties
  362. para_bboxes.append(unmatched_properties["para_bbox"])
  363. block["paras"] = processed_paras
  364. return block
  365. def batch_process_blocks(self, pdf_dict):
  366. """
  367. Parses the blocks of all pages.
  368. Parameters
  369. ----------
  370. pdf_dict : dict
  371. PDF dictionary.
  372. filter_blocks : list
  373. List of bounding boxes to filter.
  374. Returns
  375. -------
  376. result_dict : dict
  377. Result dictionary.
  378. """
  379. num_paras = 0
  380. for page_id, page in pdf_dict.items():
  381. if page_id.startswith("page_"):
  382. para_blocks = []
  383. if "para_blocks" in page.keys():
  384. input_blocks = page["para_blocks"]
  385. for input_block in input_blocks:
  386. new_block = self._cut_paras_per_block(input_block)
  387. para_blocks.append(new_block)
  388. num_paras += len(new_block["paras"])
  389. page["para_blocks"] = para_blocks
  390. pdf_dict["statistics"]["num_paras"] = num_paras
  391. return pdf_dict