denoise.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. import math
  2. from collections import defaultdict
  3. from magic_pdf.para.commons import *
  4. if sys.version_info[0] >= 3:
  5. sys.stdout.reconfigure(encoding="utf-8") # type: ignore
  6. class HeaderFooterProcessor:
  7. def __init__(self) -> None:
  8. pass
  9. def get_most_common_bboxes(self, bboxes, page_height, position="top", threshold=0.25, num_bboxes=3, min_frequency=2):
  10. """
  11. This function gets the most common bboxes from the bboxes
  12. Parameters
  13. ----------
  14. bboxes : list
  15. bboxes
  16. page_height : float
  17. height of the page
  18. position : str, optional
  19. "top" or "bottom", by default "top"
  20. threshold : float, optional
  21. threshold, by default 0.25
  22. num_bboxes : int, optional
  23. number of bboxes to return, by default 3
  24. min_frequency : int, optional
  25. minimum frequency of the bbox, by default 2
  26. Returns
  27. -------
  28. common_bboxes : list
  29. common bboxes
  30. """
  31. # Filter bbox by position
  32. if position == "top":
  33. filtered_bboxes = [bbox for bbox in bboxes if bbox[1] < page_height * threshold]
  34. else:
  35. filtered_bboxes = [bbox for bbox in bboxes if bbox[3] > page_height * (1 - threshold)]
  36. # Find the most common bbox
  37. bbox_count = defaultdict(int)
  38. for bbox in filtered_bboxes:
  39. bbox_count[tuple(bbox)] += 1
  40. # Get the most frequently occurring bbox, but only consider it when the frequency exceeds min_frequency
  41. common_bboxes = [
  42. bbox for bbox, count in sorted(bbox_count.items(), key=lambda item: item[1], reverse=True) if count >= min_frequency
  43. ][:num_bboxes]
  44. return common_bboxes
  45. def detect_footer_header(self, result_dict, similarity_threshold=0.5):
  46. """
  47. This function detects the header and footer of the document.
  48. Parameters
  49. ----------
  50. result_dict : dict
  51. result dictionary
  52. Returns
  53. -------
  54. result_dict : dict
  55. result dictionary
  56. """
  57. def compare_bbox_with_list(bbox, bbox_list, tolerance=1):
  58. return any(all(abs(a - b) < tolerance for a, b in zip(bbox, common_bbox)) for common_bbox in bbox_list)
  59. def is_single_line_block(block):
  60. # Determine based on the width and height of the block
  61. block_width = block["X1"] - block["X0"]
  62. block_height = block["bbox"][3] - block["bbox"][1]
  63. # If the height of the block is close to the average character height and the width is large, it is considered a single line
  64. return block_height <= block["avg_char_height"] * 3 and block_width > block["avg_char_width"] * 3
  65. # Traverse all blocks in the document
  66. single_preproc_blocks = 0
  67. total_blocks = 0
  68. single_preproc_blocks = 0
  69. for page_id, blocks in result_dict.items():
  70. if page_id.startswith("page_"):
  71. for block_key, block in blocks.items():
  72. if block_key.startswith("block_"):
  73. total_blocks += 1
  74. if is_single_line_block(block):
  75. single_preproc_blocks += 1
  76. # If there are no blocks, skip the header and footer detection
  77. if total_blocks == 0:
  78. print("No blocks found. Skipping header/footer detection.")
  79. return result_dict
  80. # If most of the blocks are single-line, skip the header and footer detection
  81. if single_preproc_blocks / total_blocks > 0.5: # 50% of the blocks are single-line
  82. return result_dict
  83. # Collect the bounding boxes of all blocks
  84. all_bboxes = []
  85. all_texts = []
  86. for page_id, blocks in result_dict.items():
  87. if page_id.startswith("page_"):
  88. for block_key, block in blocks.items():
  89. if block_key.startswith("block_"):
  90. all_bboxes.append(block["bbox"])
  91. # Get the height of the page
  92. page_height = max(bbox[3] for bbox in all_bboxes)
  93. # Get the most common bbox lists for headers and footers
  94. common_header_bboxes = self.get_most_common_bboxes(all_bboxes, page_height, position="top") if all_bboxes else []
  95. common_footer_bboxes = self.get_most_common_bboxes(all_bboxes, page_height, position="bottom") if all_bboxes else []
  96. # Detect and mark headers and footers
  97. for page_id, blocks in result_dict.items():
  98. if page_id.startswith("page_"):
  99. for block_key, block in blocks.items():
  100. if block_key.startswith("block_"):
  101. bbox = block["bbox"]
  102. text = block["text"]
  103. is_header = compare_bbox_with_list(bbox, common_header_bboxes)
  104. is_footer = compare_bbox_with_list(bbox, common_footer_bboxes)
  105. block["is_header"] = int(is_header)
  106. block["is_footer"] = int(is_footer)
  107. return result_dict
  108. class NonHorizontalTextProcessor:
  109. def __init__(self) -> None:
  110. pass
  111. def detect_non_horizontal_texts(self, result_dict):
  112. """
  113. This function detects watermarks and vertical margin notes in the document.
  114. Watermarks are identified by finding blocks with the same coordinates and frequently occurring identical texts across multiple pages.
  115. If these conditions are met, the blocks are highly likely to be watermarks, as opposed to headers or footers, which can change from page to page.
  116. If the direction of these blocks is not horizontal, they are definitely considered to be watermarks.
  117. Vertical margin notes are identified by finding blocks with the same coordinates and frequently occurring identical texts across multiple pages.
  118. If these conditions are met, the blocks are highly likely to be vertical margin notes, which typically appear on the left and right sides of the page.
  119. If the direction of these blocks is vertical, they are definitely considered to be vertical margin notes.
  120. Parameters
  121. ----------
  122. result_dict : dict
  123. The result dictionary.
  124. Returns
  125. -------
  126. result_dict : dict
  127. The updated result dictionary.
  128. """
  129. # Dictionary to store information about potential watermarks
  130. potential_watermarks = {}
  131. potential_margin_notes = {}
  132. for page_id, page_content in result_dict.items():
  133. if page_id.startswith("page_"):
  134. for block_id, block_data in page_content.items():
  135. if block_id.startswith("block_"):
  136. if "dir" in block_data:
  137. coordinates_text = (block_data["bbox"], block_data["text"]) # Tuple of coordinates and text
  138. angle = math.atan2(block_data["dir"][1], block_data["dir"][0])
  139. angle = abs(math.degrees(angle))
  140. if angle > 5 and angle < 85: # Check if direction is watermarks
  141. if coordinates_text in potential_watermarks:
  142. potential_watermarks[coordinates_text] += 1
  143. else:
  144. potential_watermarks[coordinates_text] = 1
  145. if angle > 85 and angle < 105: # Check if direction is vertical
  146. if coordinates_text in potential_margin_notes:
  147. potential_margin_notes[coordinates_text] += 1 # Increment count
  148. else:
  149. potential_margin_notes[coordinates_text] = 1 # Initialize count
  150. # Identify watermarks by finding entries with counts higher than a threshold (e.g., appearing on more than half of the pages)
  151. watermark_threshold = len(result_dict) // 2
  152. watermarks = {k: v for k, v in potential_watermarks.items() if v > watermark_threshold}
  153. # Identify margin notes by finding entries with counts higher than a threshold (e.g., appearing on more than half of the pages)
  154. margin_note_threshold = len(result_dict) // 2
  155. margin_notes = {k: v for k, v in potential_margin_notes.items() if v > margin_note_threshold}
  156. # Add watermark information to the result dictionary
  157. for page_id, blocks in result_dict.items():
  158. if page_id.startswith("page_"):
  159. for block_id, block_data in blocks.items():
  160. coordinates_text = (block_data["bbox"], block_data["text"])
  161. if coordinates_text in watermarks:
  162. block_data["is_watermark"] = 1
  163. else:
  164. block_data["is_watermark"] = 0
  165. if coordinates_text in margin_notes:
  166. block_data["is_vertical_margin_note"] = 1
  167. else:
  168. block_data["is_vertical_margin_note"] = 0
  169. return result_dict
  170. class NoiseRemover:
  171. def __init__(self) -> None:
  172. pass
  173. def skip_data_noises(self, result_dict):
  174. """
  175. This function skips the data noises, including overlap blocks, header, footer, watermark, vertical margin note, title
  176. """
  177. filtered_result_dict = {}
  178. for page_id, blocks in result_dict.items():
  179. if page_id.startswith("page_"):
  180. filtered_blocks = {}
  181. for block_id, block in blocks.items():
  182. if block_id.startswith("block_"):
  183. if any(
  184. block.get(key, 0)
  185. for key in [
  186. "is_overlap",
  187. "is_header",
  188. "is_footer",
  189. "is_watermark",
  190. "is_vertical_margin_note",
  191. "is_block_title",
  192. ]
  193. ):
  194. continue
  195. filtered_blocks[block_id] = block
  196. if filtered_blocks:
  197. filtered_result_dict[page_id] = filtered_blocks
  198. return filtered_result_dict